From 54db247d12192629d38471f08664dabf8eb23574 Mon Sep 17 00:00:00 2001 From: Kajetan Boroszko Date: Fri, 26 Nov 2021 11:07:09 +0100 Subject: [PATCH 01/30] serialization for windows --- .../core/kernels/bigtable/serialization.cc | 330 +++++++++++------- 1 file changed, 200 insertions(+), 130 deletions(-) diff --git a/tensorflow_io/core/kernels/bigtable/serialization.cc b/tensorflow_io/core/kernels/bigtable/serialization.cc index aa14fdefc..40266c784 100644 --- a/tensorflow_io/core/kernels/bigtable/serialization.cc +++ b/tensorflow_io/core/kernels/bigtable/serialization.cc @@ -1,130 +1,200 @@ -/* Copyright 2021 The TensorFlow Authors. All Rights Reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -============================================================================== -*/ - -#include "tensorflow_io/core/kernels/bigtable/serialization.h" - -#include "rpc/xdr.h" -#include "tensorflow/core/platform/errors.h" -#include "tensorflow/core/platform/statusor.h" - -namespace tensorflow { -namespace io { - -inline StatusOr BytesToFloat(std::string const& s) { - float v; - XDR xdrs; - xdrmem_create(&xdrs, const_cast(s.data()), sizeof(v), XDR_DECODE); - if (!xdr_float(&xdrs, &v)) { - return errors::InvalidArgument("Error reading float from byte array."); - } - return v; -} - -inline StatusOr BytesToDouble(std::string const& s) { - double v; - XDR xdrs; - xdrmem_create(&xdrs, const_cast(s.data()), sizeof(v), XDR_DECODE); - if (!xdr_double(&xdrs, &v)) { - return errors::InvalidArgument("Error reading double from byte array."); - } - return v; -} - -inline StatusOr BytesToInt64(std::string const& s) { - int64_t v; - XDR xdrs; - xdrmem_create(&xdrs, const_cast(s.data()), sizeof(v), XDR_DECODE); - if (!xdr_int64_t(&xdrs, &v)) { - return errors::InvalidArgument("Error reading int64 from byte array."); - } - return v; -} - -inline StatusOr BytesToInt32(std::string const& s) { - int32_t v; - XDR xdrs; - xdrmem_create(&xdrs, const_cast(s.data()), sizeof(v), XDR_DECODE); - if (!xdr_int32_t(&xdrs, &v)) { - return errors::InvalidArgument("Error reading int32 from byte array."); - } - return v; -} - -inline StatusOr BytesToBool(std::string const& s) { - bool_t v; - XDR xdrs; - xdrmem_create(&xdrs, const_cast(s.data()), sizeof(v), XDR_DECODE); - if (!xdr_bool(&xdrs, &v)) { - return errors::InvalidArgument("Error reading bool from byte array."); - } - return v; -} - -Status PutCellValueInTensor(Tensor& tensor, size_t index, DataType cell_type, - google::cloud::bigtable::Cell const& cell) { - switch (cell_type) { - case DT_STRING: { - auto tensor_data = tensor.tensor(); - tensor_data(index) = std::string(cell.value()); - } break; - case DT_BOOL: { - auto tensor_data = tensor.tensor(); - auto maybe_parsed_data = BytesToBool(cell.value()); - if (!maybe_parsed_data.ok()) { - return maybe_parsed_data.status(); - } - tensor_data(index) = maybe_parsed_data.ValueOrDie(); - } break; - case DT_INT32: { - auto tensor_data = tensor.tensor(); - auto maybe_parsed_data = BytesToInt32(cell.value()); - if (!maybe_parsed_data.ok()) { - return maybe_parsed_data.status(); - } - tensor_data(index) = maybe_parsed_data.ValueOrDie(); - } break; - case DT_INT64: { - auto tensor_data = tensor.tensor(); - auto maybe_parsed_data = BytesToInt64(cell.value()); - if (!maybe_parsed_data.ok()) { - return maybe_parsed_data.status(); - } - tensor_data(index) = maybe_parsed_data.ValueOrDie(); - } break; - case DT_FLOAT: { - auto tensor_data = tensor.tensor(); - auto maybe_parsed_data = BytesToFloat(cell.value()); - if (!maybe_parsed_data.ok()) { - return maybe_parsed_data.status(); - } - tensor_data(index) = maybe_parsed_data.ValueOrDie(); - } break; - case DT_DOUBLE: { - auto tensor_data = tensor.tensor(); - auto maybe_parsed_data = BytesToDouble(cell.value()); - if (!maybe_parsed_data.ok()) { - return maybe_parsed_data.status(); - } - tensor_data(index) = maybe_parsed_data.ValueOrDie(); - } break; - default: - return errors::Unimplemented("Data type not supported."); - } - return Status::OK(); -} - -} // namespace io -} // namespace tensorflow +/* Copyright 2021 The TensorFlow Authors. All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +============================================================================== +*/ + +#include "tensorflow_io/core/kernels/bigtable/serialization.h" + +#include "tensorflow/core/platform/errors.h" +#include "tensorflow/core/platform/statusor.h" + +namespace tensorflow { +namespace io { +namespace { +#ifdef _WIN32 + +#include + +inline StatusOr BytesToBool(const std::string &bytes) { + union { + char byte; + bool res; + } u; + if (bytes.size() != 1U) { + return errors::InvalidArgument("Invalid bool representation."); + } + u.byte = bytes[0]; + return u.res; +} + +inline StatusOr BytesToInt32(const std::string &bytes) { + union { + char bytes[4]; + uint32_t res; + } u; + if (bytes.size() != 4U) { + return errors::InvalidArgument("Invalid int32 representation."); + } + memcpy(u.bytes, bytes.data(), 4); + return ntohl(u.res); +} + +inline StatusOr BytesToInt64(const std::string &bytes) { + union { + char bytes[8]; + uint32_t res; + } u; + if (bytes.size() != 8U) { + return errors::InvalidArgument("Invalid int64 representation."); + } + memcpy(u.bytes, bytes.data(), 8); + return ntohl(u.res); // <======= FIXME! BUG! HELP! +} + +inline StatusOr BytesToFloat(std::string const& s) { + auto const int_rep = BytesToInt32(s); + if (!int_rep.ok()) { + return int_rep; + } + union { + float res; + uint32_t int_rep; + } u; + u.int_rep = *int_rep; + return u.res; +} + +inline StatusOr BytesToDouble(std::string const& s) { + auto const int_rep = BytesToInt64(s); + if (!int_rep.ok()) { + return int_rep; + } + union { + double res; + uint64_t int_rep; + } u; + u.int_rep = *int_rep; + return u.res; +} +#else // _WIN32 + +#include "rpc/xdr.h" + +inline StatusOr BytesToFloat(std::string const& s) { + float v; + XDR xdrs; + xdrmem_create(&xdrs, const_cast(s.data()), sizeof(v), XDR_DECODE); + if (!xdr_float(&xdrs, &v)) { + return errors::InvalidArgument("Error reading float from byte array."); + } + return v; +} + +inline StatusOr BytesToDouble(std::string const& s) { + double v; + XDR xdrs; + xdrmem_create(&xdrs, const_cast(s.data()), sizeof(v), XDR_DECODE); + if (!xdr_double(&xdrs, &v)) { + return errors::InvalidArgument("Error reading double from byte array."); + } + return v; +} + +inline StatusOr BytesToInt64(std::string const& s) { + int64_t v; + XDR xdrs; + xdrmem_create(&xdrs, const_cast(s.data()), sizeof(v), XDR_DECODE); + if (!xdr_int64_t(&xdrs, &v)) { + return errors::InvalidArgument("Error reading int64 from byte array."); + } + return v; +} + +inline StatusOr BytesToInt32(std::string const& s) { + int32_t v; + XDR xdrs; + xdrmem_create(&xdrs, const_cast(s.data()), sizeof(v), XDR_DECODE); + if (!xdr_int32_t(&xdrs, &v)) { + return errors::InvalidArgument("Error reading int32 from byte array."); + } + return v; +} + +inline StatusOr BytesToBool(std::string const& s) { + bool_t v; + XDR xdrs; + xdrmem_create(&xdrs, const_cast(s.data()), sizeof(v), XDR_DECODE); + if (!xdr_bool(&xdrs, &v)) { + return errors::InvalidArgument("Error reading bool from byte array."); + } + return v; +} + +#endif // _WIN32 +} // namespace +Status PutCellValueInTensor(Tensor& tensor, size_t index, DataType cell_type, + google::cloud::bigtable::Cell const& cell) { + switch (cell_type) { + case DT_STRING: { + auto tensor_data = tensor.tensor(); + tensor_data(index) = std::string(cell.value()); + } break; + case DT_BOOL: { + auto tensor_data = tensor.tensor(); + auto maybe_parsed_data = BytesToBool(cell.value()); + if (!maybe_parsed_data.ok()) { + return maybe_parsed_data.status(); + } + tensor_data(index) = maybe_parsed_data.ValueOrDie(); + } break; + case DT_INT32: { + auto tensor_data = tensor.tensor(); + auto maybe_parsed_data = BytesToInt32(cell.value()); + if (!maybe_parsed_data.ok()) { + return maybe_parsed_data.status(); + } + tensor_data(index) = maybe_parsed_data.ValueOrDie(); + } break; + case DT_INT64: { + auto tensor_data = tensor.tensor(); + auto maybe_parsed_data = BytesToInt64(cell.value()); + if (!maybe_parsed_data.ok()) { + return maybe_parsed_data.status(); + } + tensor_data(index) = maybe_parsed_data.ValueOrDie(); + } break; + case DT_FLOAT: { + auto tensor_data = tensor.tensor(); + auto maybe_parsed_data = BytesToFloat(cell.value()); + if (!maybe_parsed_data.ok()) { + return maybe_parsed_data.status(); + } + tensor_data(index) = maybe_parsed_data.ValueOrDie(); + } break; + case DT_DOUBLE: { + auto tensor_data = tensor.tensor(); + auto maybe_parsed_data = BytesToDouble(cell.value()); + if (!maybe_parsed_data.ok()) { + return maybe_parsed_data.status(); + } + tensor_data(index) = maybe_parsed_data.ValueOrDie(); + } break; + default: + return errors::Unimplemented("Data type not supported."); + } + return Status::OK(); +} + +} // namespace io +} // namespace tensorflow From 9fb5a99e7f561bdf456508d99d0369560d33e41e Mon Sep 17 00:00:00 2001 From: Kajetan Boroszko Date: Fri, 26 Nov 2021 14:32:31 +0100 Subject: [PATCH 02/30] patched com_google_googleapis to not include OPTIONAL keyword --- WORKSPACE | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/WORKSPACE b/WORKSPACE index 8d94e8f63..fb98ed26b 100644 --- a/WORKSPACE +++ b/WORKSPACE @@ -109,9 +109,16 @@ http_archive( ) # Note com_google_googleapis is placed earlier as we need to adjust switched_rules_by_language option +# Note we have to change one word in the field_behavior.proto so it compiles on WINDOWS +# for more infor please refer to https://github.com/protocolbuffers/protobuf/issues/7076 http_archive( name = "com_google_googleapis", build_file = "@com_github_googleapis_google_cloud_cpp//bazel:googleapis.BUILD", + patch_cmds = [ + """sed -i.bak 's/OPTIONAL/OPIONAL/g' google/api/field_behavior.proto""", + """sed -i.bak 's/OPTIONAL/OPIONAL/g' google/pubsub/v1beta2/pubsub.proto""", + """sed -i.bak 's/OPTIONAL/OPIONAL/g' google/pubsub/v1/pubsub.proto""", + ], sha256 = "a53e15405f81d5a32594d7f6486e649131fadda5431cf28377dff4ae54d45d16", strip_prefix = "googleapis-d4d09eb3aec152015f35717102f9b423988b94f7", urls = [ From 16b61979d3d4dfa91f83661956f7e063f26800b4 Mon Sep 17 00:00:00 2001 From: Kajetan Boroszko Date: Fri, 26 Nov 2021 14:48:50 +0100 Subject: [PATCH 03/30] linting --- WORKSPACE | 6 +++--- bld.sh | 9 +++++++++ .../core/kernels/bigtable/serialization.cc | 6 +++--- test.py | 14 ++++++++++++++ 4 files changed, 29 insertions(+), 6 deletions(-) create mode 100755 bld.sh create mode 100644 test.py diff --git a/WORKSPACE b/WORKSPACE index fb98ed26b..4516171d4 100644 --- a/WORKSPACE +++ b/WORKSPACE @@ -115,9 +115,9 @@ http_archive( name = "com_google_googleapis", build_file = "@com_github_googleapis_google_cloud_cpp//bazel:googleapis.BUILD", patch_cmds = [ - """sed -i.bak 's/OPTIONAL/OPIONAL/g' google/api/field_behavior.proto""", - """sed -i.bak 's/OPTIONAL/OPIONAL/g' google/pubsub/v1beta2/pubsub.proto""", - """sed -i.bak 's/OPTIONAL/OPIONAL/g' google/pubsub/v1/pubsub.proto""", + """sed -i.bak 's/OPTIONAL/OPIONAL/g' google/api/field_behavior.proto""", + """sed -i.bak 's/OPTIONAL/OPIONAL/g' google/pubsub/v1beta2/pubsub.proto""", + """sed -i.bak 's/OPTIONAL/OPIONAL/g' google/pubsub/v1/pubsub.proto""", ], sha256 = "a53e15405f81d5a32594d7f6486e649131fadda5431cf28377dff4ae54d45d16", strip_prefix = "googleapis-d4d09eb3aec152015f35717102f9b423988b94f7", diff --git a/bld.sh b/bld.sh new file mode 100755 index 000000000..e3726be67 --- /dev/null +++ b/bld.sh @@ -0,0 +1,9 @@ +#!/usr/bin/env bash + +ls dist/* +for f in dist/*.whl; do + docker run -i --rm -v $PWD:/v -w /v --net=host quay.io/pypa/manylinux2010_x86_64 bash -x -e /v/tools/build/auditwheel repair --plat manylinux2010_x86_64 $f +done +sudo chown -R $(id -nu):$(id -ng) . +ls wheelhouse/* + diff --git a/tensorflow_io/core/kernels/bigtable/serialization.cc b/tensorflow_io/core/kernels/bigtable/serialization.cc index 40266c784..8a492e634 100644 --- a/tensorflow_io/core/kernels/bigtable/serialization.cc +++ b/tensorflow_io/core/kernels/bigtable/serialization.cc @@ -26,7 +26,7 @@ namespace { #include -inline StatusOr BytesToBool(const std::string &bytes) { +inline StatusOr BytesToBool(const std::string& bytes) { union { char byte; bool res; @@ -38,7 +38,7 @@ inline StatusOr BytesToBool(const std::string &bytes) { return u.res; } -inline StatusOr BytesToInt32(const std::string &bytes) { +inline StatusOr BytesToInt32(const std::string& bytes) { union { char bytes[4]; uint32_t res; @@ -50,7 +50,7 @@ inline StatusOr BytesToInt32(const std::string &bytes) { return ntohl(u.res); } -inline StatusOr BytesToInt64(const std::string &bytes) { +inline StatusOr BytesToInt64(const std::string& bytes) { union { char bytes[8]; uint32_t res; diff --git a/test.py b/test.py new file mode 100644 index 000000000..17535e49a --- /dev/null +++ b/test.py @@ -0,0 +1,14 @@ +import tensorflow_io as tfio +from tensorflow_io import bigtable as bt + +_=[print(x) for x in dir(bt)] + +c = bt.BigtableClient("test-project", "test-instance") +t = c.get_table("t1") +row_s = row_set.from_rows_or_ranges(row_range.closed_range("row000", "row009")) + +read_rows = [ + r for r in t.read_rows(["cf1:c1"], row_set=row_s) +] +print(read_rows) + From d546f242f0002ada6b1cad9f75403a050629354b Mon Sep 17 00:00:00 2001 From: Kajetan Boroszko Date: Fri, 26 Nov 2021 15:01:02 +0100 Subject: [PATCH 04/30] linux line endings --- .../core/kernels/bigtable/serialization.cc | 400 +++++++++--------- 1 file changed, 200 insertions(+), 200 deletions(-) diff --git a/tensorflow_io/core/kernels/bigtable/serialization.cc b/tensorflow_io/core/kernels/bigtable/serialization.cc index 8a492e634..2a0bc8042 100644 --- a/tensorflow_io/core/kernels/bigtable/serialization.cc +++ b/tensorflow_io/core/kernels/bigtable/serialization.cc @@ -1,200 +1,200 @@ -/* Copyright 2021 The TensorFlow Authors. All Rights Reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -============================================================================== -*/ - -#include "tensorflow_io/core/kernels/bigtable/serialization.h" - -#include "tensorflow/core/platform/errors.h" -#include "tensorflow/core/platform/statusor.h" - -namespace tensorflow { -namespace io { -namespace { -#ifdef _WIN32 - -#include - -inline StatusOr BytesToBool(const std::string& bytes) { - union { - char byte; - bool res; - } u; - if (bytes.size() != 1U) { - return errors::InvalidArgument("Invalid bool representation."); - } - u.byte = bytes[0]; - return u.res; -} - -inline StatusOr BytesToInt32(const std::string& bytes) { - union { - char bytes[4]; - uint32_t res; - } u; - if (bytes.size() != 4U) { - return errors::InvalidArgument("Invalid int32 representation."); - } - memcpy(u.bytes, bytes.data(), 4); - return ntohl(u.res); -} - -inline StatusOr BytesToInt64(const std::string& bytes) { - union { - char bytes[8]; - uint32_t res; - } u; - if (bytes.size() != 8U) { - return errors::InvalidArgument("Invalid int64 representation."); - } - memcpy(u.bytes, bytes.data(), 8); - return ntohl(u.res); // <======= FIXME! BUG! HELP! -} - -inline StatusOr BytesToFloat(std::string const& s) { - auto const int_rep = BytesToInt32(s); - if (!int_rep.ok()) { - return int_rep; - } - union { - float res; - uint32_t int_rep; - } u; - u.int_rep = *int_rep; - return u.res; -} - -inline StatusOr BytesToDouble(std::string const& s) { - auto const int_rep = BytesToInt64(s); - if (!int_rep.ok()) { - return int_rep; - } - union { - double res; - uint64_t int_rep; - } u; - u.int_rep = *int_rep; - return u.res; -} -#else // _WIN32 - -#include "rpc/xdr.h" - -inline StatusOr BytesToFloat(std::string const& s) { - float v; - XDR xdrs; - xdrmem_create(&xdrs, const_cast(s.data()), sizeof(v), XDR_DECODE); - if (!xdr_float(&xdrs, &v)) { - return errors::InvalidArgument("Error reading float from byte array."); - } - return v; -} - -inline StatusOr BytesToDouble(std::string const& s) { - double v; - XDR xdrs; - xdrmem_create(&xdrs, const_cast(s.data()), sizeof(v), XDR_DECODE); - if (!xdr_double(&xdrs, &v)) { - return errors::InvalidArgument("Error reading double from byte array."); - } - return v; -} - -inline StatusOr BytesToInt64(std::string const& s) { - int64_t v; - XDR xdrs; - xdrmem_create(&xdrs, const_cast(s.data()), sizeof(v), XDR_DECODE); - if (!xdr_int64_t(&xdrs, &v)) { - return errors::InvalidArgument("Error reading int64 from byte array."); - } - return v; -} - -inline StatusOr BytesToInt32(std::string const& s) { - int32_t v; - XDR xdrs; - xdrmem_create(&xdrs, const_cast(s.data()), sizeof(v), XDR_DECODE); - if (!xdr_int32_t(&xdrs, &v)) { - return errors::InvalidArgument("Error reading int32 from byte array."); - } - return v; -} - -inline StatusOr BytesToBool(std::string const& s) { - bool_t v; - XDR xdrs; - xdrmem_create(&xdrs, const_cast(s.data()), sizeof(v), XDR_DECODE); - if (!xdr_bool(&xdrs, &v)) { - return errors::InvalidArgument("Error reading bool from byte array."); - } - return v; -} - -#endif // _WIN32 -} // namespace -Status PutCellValueInTensor(Tensor& tensor, size_t index, DataType cell_type, - google::cloud::bigtable::Cell const& cell) { - switch (cell_type) { - case DT_STRING: { - auto tensor_data = tensor.tensor(); - tensor_data(index) = std::string(cell.value()); - } break; - case DT_BOOL: { - auto tensor_data = tensor.tensor(); - auto maybe_parsed_data = BytesToBool(cell.value()); - if (!maybe_parsed_data.ok()) { - return maybe_parsed_data.status(); - } - tensor_data(index) = maybe_parsed_data.ValueOrDie(); - } break; - case DT_INT32: { - auto tensor_data = tensor.tensor(); - auto maybe_parsed_data = BytesToInt32(cell.value()); - if (!maybe_parsed_data.ok()) { - return maybe_parsed_data.status(); - } - tensor_data(index) = maybe_parsed_data.ValueOrDie(); - } break; - case DT_INT64: { - auto tensor_data = tensor.tensor(); - auto maybe_parsed_data = BytesToInt64(cell.value()); - if (!maybe_parsed_data.ok()) { - return maybe_parsed_data.status(); - } - tensor_data(index) = maybe_parsed_data.ValueOrDie(); - } break; - case DT_FLOAT: { - auto tensor_data = tensor.tensor(); - auto maybe_parsed_data = BytesToFloat(cell.value()); - if (!maybe_parsed_data.ok()) { - return maybe_parsed_data.status(); - } - tensor_data(index) = maybe_parsed_data.ValueOrDie(); - } break; - case DT_DOUBLE: { - auto tensor_data = tensor.tensor(); - auto maybe_parsed_data = BytesToDouble(cell.value()); - if (!maybe_parsed_data.ok()) { - return maybe_parsed_data.status(); - } - tensor_data(index) = maybe_parsed_data.ValueOrDie(); - } break; - default: - return errors::Unimplemented("Data type not supported."); - } - return Status::OK(); -} - -} // namespace io -} // namespace tensorflow +/* Copyright 2021 The TensorFlow Authors. All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +============================================================================== +*/ + +#include "tensorflow_io/core/kernels/bigtable/serialization.h" + +#include "tensorflow/core/platform/errors.h" +#include "tensorflow/core/platform/statusor.h" + +namespace tensorflow { +namespace io { +namespace { +#ifdef _WIN32 + +#include + +inline StatusOr BytesToBool(const std::string& bytes) { + union { + char byte; + bool res; + } u; + if (bytes.size() != 1U) { + return errors::InvalidArgument("Invalid bool representation."); + } + u.byte = bytes[0]; + return u.res; +} + +inline StatusOr BytesToInt32(const std::string& bytes) { + union { + char bytes[4]; + uint32_t res; + } u; + if (bytes.size() != 4U) { + return errors::InvalidArgument("Invalid int32 representation."); + } + memcpy(u.bytes, bytes.data(), 4); + return ntohl(u.res); +} + +inline StatusOr BytesToInt64(const std::string& bytes) { + union { + char bytes[8]; + uint32_t res; + } u; + if (bytes.size() != 8U) { + return errors::InvalidArgument("Invalid int64 representation."); + } + memcpy(u.bytes, bytes.data(), 8); + return ntohl(u.res); // <======= FIXME! BUG! HELP! +} + +inline StatusOr BytesToFloat(std::string const& s) { + auto const int_rep = BytesToInt32(s); + if (!int_rep.ok()) { + return int_rep; + } + union { + float res; + uint32_t int_rep; + } u; + u.int_rep = *int_rep; + return u.res; +} + +inline StatusOr BytesToDouble(std::string const& s) { + auto const int_rep = BytesToInt64(s); + if (!int_rep.ok()) { + return int_rep; + } + union { + double res; + uint64_t int_rep; + } u; + u.int_rep = *int_rep; + return u.res; +} +#else // _WIN32 + +#include "rpc/xdr.h" + +inline StatusOr BytesToFloat(std::string const& s) { + float v; + XDR xdrs; + xdrmem_create(&xdrs, const_cast(s.data()), sizeof(v), XDR_DECODE); + if (!xdr_float(&xdrs, &v)) { + return errors::InvalidArgument("Error reading float from byte array."); + } + return v; +} + +inline StatusOr BytesToDouble(std::string const& s) { + double v; + XDR xdrs; + xdrmem_create(&xdrs, const_cast(s.data()), sizeof(v), XDR_DECODE); + if (!xdr_double(&xdrs, &v)) { + return errors::InvalidArgument("Error reading double from byte array."); + } + return v; +} + +inline StatusOr BytesToInt64(std::string const& s) { + int64_t v; + XDR xdrs; + xdrmem_create(&xdrs, const_cast(s.data()), sizeof(v), XDR_DECODE); + if (!xdr_int64_t(&xdrs, &v)) { + return errors::InvalidArgument("Error reading int64 from byte array."); + } + return v; +} + +inline StatusOr BytesToInt32(std::string const& s) { + int32_t v; + XDR xdrs; + xdrmem_create(&xdrs, const_cast(s.data()), sizeof(v), XDR_DECODE); + if (!xdr_int32_t(&xdrs, &v)) { + return errors::InvalidArgument("Error reading int32 from byte array."); + } + return v; +} + +inline StatusOr BytesToBool(std::string const& s) { + bool_t v; + XDR xdrs; + xdrmem_create(&xdrs, const_cast(s.data()), sizeof(v), XDR_DECODE); + if (!xdr_bool(&xdrs, &v)) { + return errors::InvalidArgument("Error reading bool from byte array."); + } + return v; +} + +#endif // _WIN32 +} // namespace +Status PutCellValueInTensor(Tensor& tensor, size_t index, DataType cell_type, + google::cloud::bigtable::Cell const& cell) { + switch (cell_type) { + case DT_STRING: { + auto tensor_data = tensor.tensor(); + tensor_data(index) = std::string(cell.value()); + } break; + case DT_BOOL: { + auto tensor_data = tensor.tensor(); + auto maybe_parsed_data = BytesToBool(cell.value()); + if (!maybe_parsed_data.ok()) { + return maybe_parsed_data.status(); + } + tensor_data(index) = maybe_parsed_data.ValueOrDie(); + } break; + case DT_INT32: { + auto tensor_data = tensor.tensor(); + auto maybe_parsed_data = BytesToInt32(cell.value()); + if (!maybe_parsed_data.ok()) { + return maybe_parsed_data.status(); + } + tensor_data(index) = maybe_parsed_data.ValueOrDie(); + } break; + case DT_INT64: { + auto tensor_data = tensor.tensor(); + auto maybe_parsed_data = BytesToInt64(cell.value()); + if (!maybe_parsed_data.ok()) { + return maybe_parsed_data.status(); + } + tensor_data(index) = maybe_parsed_data.ValueOrDie(); + } break; + case DT_FLOAT: { + auto tensor_data = tensor.tensor(); + auto maybe_parsed_data = BytesToFloat(cell.value()); + if (!maybe_parsed_data.ok()) { + return maybe_parsed_data.status(); + } + tensor_data(index) = maybe_parsed_data.ValueOrDie(); + } break; + case DT_DOUBLE: { + auto tensor_data = tensor.tensor(); + auto maybe_parsed_data = BytesToDouble(cell.value()); + if (!maybe_parsed_data.ok()) { + return maybe_parsed_data.status(); + } + tensor_data(index) = maybe_parsed_data.ValueOrDie(); + } break; + default: + return errors::Unimplemented("Data type not supported."); + } + return Status::OK(); +} + +} // namespace io +} // namespace tensorflow From b4c1f70e970da0703a57f268b5a581f0c93553ab Mon Sep 17 00:00:00 2001 From: Kajetan Boroszko Date: Fri, 26 Nov 2021 15:27:32 +0100 Subject: [PATCH 05/30] passing cell instead of value --- .../core/kernels/bigtable/serialization.cc | 55 +++++++++++-------- 1 file changed, 32 insertions(+), 23 deletions(-) diff --git a/tensorflow_io/core/kernels/bigtable/serialization.cc b/tensorflow_io/core/kernels/bigtable/serialization.cc index 2a0bc8042..68298d874 100644 --- a/tensorflow_io/core/kernels/bigtable/serialization.cc +++ b/tensorflow_io/core/kernels/bigtable/serialization.cc @@ -19,14 +19,19 @@ limitations under the License. #include "tensorflow/core/platform/errors.h" #include "tensorflow/core/platform/statusor.h" +namespace cbt = ::google::cloud::bigtable; + namespace tensorflow { namespace io { namespace { + + #ifdef _WIN32 #include -inline StatusOr BytesToBool(const std::string& bytes) { +inline StatusOr BytesToBool(const cbt::Cell cell) { + std::string const& bytes = cell.value(); union { char byte; bool res; @@ -38,7 +43,8 @@ inline StatusOr BytesToBool(const std::string& bytes) { return u.res; } -inline StatusOr BytesToInt32(const std::string& bytes) { +inline StatusOr BytesToInt32(const cbt::Cell cell) { + std::string const& bytes = cell.value(); union { char bytes[4]; uint32_t res; @@ -50,19 +56,16 @@ inline StatusOr BytesToInt32(const std::string& bytes) { return ntohl(u.res); } -inline StatusOr BytesToInt64(const std::string& bytes) { - union { - char bytes[8]; - uint32_t res; - } u; - if (bytes.size() != 8U) { - return errors::InvalidArgument("Invalid int64 representation."); +inline StatusOr BytesToInt64(const cbt::Cell cell) { + auto maybe_value = cell.decode_big_endian_integer(); + if(!maybe_value.ok()){ + return errors::InvalidArgument("Invalid int32 representation."); } - memcpy(u.bytes, bytes.data(), 8); - return ntohl(u.res); // <======= FIXME! BUG! HELP! + return maybe_value.value(); } -inline StatusOr BytesToFloat(std::string const& s) { +inline StatusOr BytesToFloat(cbt::Cell cell) { + std::string const& s = cell.value(); auto const int_rep = BytesToInt32(s); if (!int_rep.ok()) { return int_rep; @@ -75,7 +78,8 @@ inline StatusOr BytesToFloat(std::string const& s) { return u.res; } -inline StatusOr BytesToDouble(std::string const& s) { +inline StatusOr BytesToDouble(cbt::Cell cell) { + std::string const& s = cell.value(); auto const int_rep = BytesToInt64(s); if (!int_rep.ok()) { return int_rep; @@ -91,7 +95,8 @@ inline StatusOr BytesToDouble(std::string const& s) { #include "rpc/xdr.h" -inline StatusOr BytesToFloat(std::string const& s) { +inline StatusOr BytesToFloat(cbt::Cell cell) { + std::string const& s = cell.value(); float v; XDR xdrs; xdrmem_create(&xdrs, const_cast(s.data()), sizeof(v), XDR_DECODE); @@ -101,7 +106,8 @@ inline StatusOr BytesToFloat(std::string const& s) { return v; } -inline StatusOr BytesToDouble(std::string const& s) { +inline StatusOr BytesToDouble(cbt::Cell cell) { + std::string const& s = cell.value(); double v; XDR xdrs; xdrmem_create(&xdrs, const_cast(s.data()), sizeof(v), XDR_DECODE); @@ -111,7 +117,8 @@ inline StatusOr BytesToDouble(std::string const& s) { return v; } -inline StatusOr BytesToInt64(std::string const& s) { +inline StatusOr BytesToInt64(cbt::Cell cell) { + std::string const& s = cell.value(); int64_t v; XDR xdrs; xdrmem_create(&xdrs, const_cast(s.data()), sizeof(v), XDR_DECODE); @@ -121,7 +128,8 @@ inline StatusOr BytesToInt64(std::string const& s) { return v; } -inline StatusOr BytesToInt32(std::string const& s) { +inline StatusOr BytesToInt32(cbt::Cell cell) { + std::string const& s = cell.value(); int32_t v; XDR xdrs; xdrmem_create(&xdrs, const_cast(s.data()), sizeof(v), XDR_DECODE); @@ -131,7 +139,8 @@ inline StatusOr BytesToInt32(std::string const& s) { return v; } -inline StatusOr BytesToBool(std::string const& s) { +inline StatusOr BytesToBool(cbt::Cell cell) { + std::string const& s = cell.value(); bool_t v; XDR xdrs; xdrmem_create(&xdrs, const_cast(s.data()), sizeof(v), XDR_DECODE); @@ -152,7 +161,7 @@ Status PutCellValueInTensor(Tensor& tensor, size_t index, DataType cell_type, } break; case DT_BOOL: { auto tensor_data = tensor.tensor(); - auto maybe_parsed_data = BytesToBool(cell.value()); + auto maybe_parsed_data = BytesToBool(cell); if (!maybe_parsed_data.ok()) { return maybe_parsed_data.status(); } @@ -160,7 +169,7 @@ Status PutCellValueInTensor(Tensor& tensor, size_t index, DataType cell_type, } break; case DT_INT32: { auto tensor_data = tensor.tensor(); - auto maybe_parsed_data = BytesToInt32(cell.value()); + auto maybe_parsed_data = BytesToInt32(cell); if (!maybe_parsed_data.ok()) { return maybe_parsed_data.status(); } @@ -168,7 +177,7 @@ Status PutCellValueInTensor(Tensor& tensor, size_t index, DataType cell_type, } break; case DT_INT64: { auto tensor_data = tensor.tensor(); - auto maybe_parsed_data = BytesToInt64(cell.value()); + auto maybe_parsed_data = BytesToInt64(cell); if (!maybe_parsed_data.ok()) { return maybe_parsed_data.status(); } @@ -176,7 +185,7 @@ Status PutCellValueInTensor(Tensor& tensor, size_t index, DataType cell_type, } break; case DT_FLOAT: { auto tensor_data = tensor.tensor(); - auto maybe_parsed_data = BytesToFloat(cell.value()); + auto maybe_parsed_data = BytesToFloat(cell); if (!maybe_parsed_data.ok()) { return maybe_parsed_data.status(); } @@ -184,7 +193,7 @@ Status PutCellValueInTensor(Tensor& tensor, size_t index, DataType cell_type, } break; case DT_DOUBLE: { auto tensor_data = tensor.tensor(); - auto maybe_parsed_data = BytesToDouble(cell.value()); + auto maybe_parsed_data = BytesToDouble(cell); if (!maybe_parsed_data.ok()) { return maybe_parsed_data.status(); } From 4cd52d56e55c6c756398754a54707e5e8f487d70 Mon Sep 17 00:00:00 2001 From: Kajetan Boroszko Date: Fri, 26 Nov 2021 19:08:15 +0100 Subject: [PATCH 06/30] using two implementations --- .../bigtable/bigtable_dataset_kernel.cc | 3 +- .../core/kernels/bigtable/serialization.cc | 154 ++++++++++-------- .../core/kernels/bigtable/serialization.h | 39 ++++- 3 files changed, 122 insertions(+), 74 deletions(-) diff --git a/tensorflow_io/core/kernels/bigtable/bigtable_dataset_kernel.cc b/tensorflow_io/core/kernels/bigtable/bigtable_dataset_kernel.cc index 596da7396..002cc4339 100644 --- a/tensorflow_io/core/kernels/bigtable/bigtable_dataset_kernel.cc +++ b/tensorflow_io/core/kernels/bigtable/bigtable_dataset_kernel.cc @@ -186,7 +186,7 @@ class Iterator : public DatasetIterator { if (column_idx != column_to_idx_.end()) { VLOG(1) << "getting column:" << column_idx->second; TF_RETURN_IF_ERROR( - io::PutCellValueInTensor(res, column_idx->second, dtype, cell)); + serializer_.PutCellValueInTensor(res, column_idx->second, dtype, cell)); } else { LOG(ERROR) << "column " << cell.family_name() << ":" << cell.column_qualifier() @@ -275,6 +275,7 @@ class Iterator : public DatasetIterator { const absl::flat_hash_map, size_t> column_to_idx_; + const io::Serializer serializer_; }; class Dataset : public DatasetBase { diff --git a/tensorflow_io/core/kernels/bigtable/serialization.cc b/tensorflow_io/core/kernels/bigtable/serialization.cc index 68298d874..a9beb2a15 100644 --- a/tensorflow_io/core/kernels/bigtable/serialization.cc +++ b/tensorflow_io/core/kernels/bigtable/serialization.cc @@ -15,7 +15,6 @@ limitations under the License. */ #include "tensorflow_io/core/kernels/bigtable/serialization.h" - #include "tensorflow/core/platform/errors.h" #include "tensorflow/core/platform/statusor.h" @@ -30,72 +29,32 @@ namespace { #include -inline StatusOr BytesToBool(const cbt::Cell cell) { - std::string const& bytes = cell.value(); - union { - char byte; - bool res; - } u; - if (bytes.size() != 1U) { - return errors::InvalidArgument("Invalid bool representation."); - } - u.byte = bytes[0]; - return u.res; + +inline StatusOr BytesToFloat(const cbt::Cell & cell) { + return errors::Unimplemented("Use BytesToFloatWin instead."); } -inline StatusOr BytesToInt32(const cbt::Cell cell) { - std::string const& bytes = cell.value(); - union { - char bytes[4]; - uint32_t res; - } u; - if (bytes.size() != 4U) { - return errors::InvalidArgument("Invalid int32 representation."); - } - memcpy(u.bytes, bytes.data(), 4); - return ntohl(u.res); +inline StatusOr BytesToDouble(const cbt::Cell & cell) { + return errors::Unimplemented("Use BytesToDoubleWin instead."); } -inline StatusOr BytesToInt64(const cbt::Cell cell) { - auto maybe_value = cell.decode_big_endian_integer(); - if(!maybe_value.ok()){ - return errors::InvalidArgument("Invalid int32 representation."); - } - return maybe_value.value(); +inline StatusOr BytesToInt64(const cbt::Cell & cell) { + return errors::Unimplemented("Use BytesToInt64Win instead."); } -inline StatusOr BytesToFloat(cbt::Cell cell) { - std::string const& s = cell.value(); - auto const int_rep = BytesToInt32(s); - if (!int_rep.ok()) { - return int_rep; - } - union { - float res; - uint32_t int_rep; - } u; - u.int_rep = *int_rep; - return u.res; +inline StatusOr BytesToInt32(const cbt::Cell & cell) { + return errors::Unimplemented("Use BytesToInt32Win instead."); } -inline StatusOr BytesToDouble(cbt::Cell cell) { - std::string const& s = cell.value(); - auto const int_rep = BytesToInt64(s); - if (!int_rep.ok()) { - return int_rep; - } - union { - double res; - uint64_t int_rep; - } u; - u.int_rep = *int_rep; - return u.res; +inline StatusOr BytesToBool(const cbt::Cell & cell) { + return errors::Unimplemented("Use BytesToBoolWin instead."); } + #else // _WIN32 +#include -#include "rpc/xdr.h" -inline StatusOr BytesToFloat(cbt::Cell cell) { +inline StatusOr BytesToFloat(const cbt::Cell & cell) { std::string const& s = cell.value(); float v; XDR xdrs; @@ -106,7 +65,7 @@ inline StatusOr BytesToFloat(cbt::Cell cell) { return v; } -inline StatusOr BytesToDouble(cbt::Cell cell) { +inline StatusOr BytesToDouble(const cbt::Cell & cell) { std::string const& s = cell.value(); double v; XDR xdrs; @@ -117,7 +76,7 @@ inline StatusOr BytesToDouble(cbt::Cell cell) { return v; } -inline StatusOr BytesToInt64(cbt::Cell cell) { +inline StatusOr BytesToInt64(const cbt::Cell & cell) { std::string const& s = cell.value(); int64_t v; XDR xdrs; @@ -128,7 +87,7 @@ inline StatusOr BytesToInt64(cbt::Cell cell) { return v; } -inline StatusOr BytesToInt32(cbt::Cell cell) { +inline StatusOr BytesToInt32(const cbt::Cell & cell) { std::string const& s = cell.value(); int32_t v; XDR xdrs; @@ -139,7 +98,7 @@ inline StatusOr BytesToInt32(cbt::Cell cell) { return v; } -inline StatusOr BytesToBool(cbt::Cell cell) { +inline StatusOr BytesToBool(const cbt::Cell & cell) { std::string const& s = cell.value(); bool_t v; XDR xdrs; @@ -151,9 +110,72 @@ inline StatusOr BytesToBool(cbt::Cell cell) { } #endif // _WIN32 + +inline StatusOr BytesToBoolWin(const cbt::Cell & cell) { + std::string const& bytes = cell.value(); + union { + char byte; + bool res; + } u; + if (bytes.size() != 1U) { + return errors::InvalidArgument("Invalid bool representation."); + } + u.byte = bytes[0]; + return u.res; +} + +inline StatusOr BytesToInt32Win(const cbt::Cell & cell) { + std::string const& bytes = cell.value(); + union { + char bytes[4]; + int32_t res; + } u; + if (bytes.size() != 4U) { + return errors::InvalidArgument("Invalid int32 representation."); + } + memcpy(u.bytes, bytes.data(), 4); + return ntohl(u.res); +} + +inline StatusOr BytesToInt64Win(const cbt::Cell & cell) { + auto maybe_value = cell.decode_big_endian_integer(); + if(!maybe_value.ok()){ + return errors::InvalidArgument("Invalid int32 representation."); + } + return maybe_value.value(); +} + +inline StatusOr BytesToFloatWin(const cbt::Cell & cell) { + auto const int_rep = BytesToInt32Win(cell); + if (!int_rep.ok()) { + return int_rep; + } + union { + float res; + int32_t int_rep; + } u; + u.int_rep = *int_rep; + return u.res; +} + +inline StatusOr BytesToDoubleWin(const cbt::Cell & cell) { + auto const int_rep = BytesToInt64Win(cell); + if (!int_rep.ok()) { + return int_rep; + } + union { + double res; + int64_t int_rep; + } u; + u.int_rep = *int_rep; + return u.res; +} + + } // namespace -Status PutCellValueInTensor(Tensor& tensor, size_t index, DataType cell_type, - google::cloud::bigtable::Cell const& cell) { +Status Serializer::PutCellValueInTensor(Tensor& tensor, size_t index, DataType cell_type, + google::cloud::bigtable::Cell const& cell) const { + switch (cell_type) { case DT_STRING: { auto tensor_data = tensor.tensor(); @@ -161,7 +183,7 @@ Status PutCellValueInTensor(Tensor& tensor, size_t index, DataType cell_type, } break; case DT_BOOL: { auto tensor_data = tensor.tensor(); - auto maybe_parsed_data = BytesToBool(cell); + auto maybe_parsed_data = use_xdr_ ? BytesToBool(cell) : BytesToBoolWin(cell); if (!maybe_parsed_data.ok()) { return maybe_parsed_data.status(); } @@ -169,7 +191,7 @@ Status PutCellValueInTensor(Tensor& tensor, size_t index, DataType cell_type, } break; case DT_INT32: { auto tensor_data = tensor.tensor(); - auto maybe_parsed_data = BytesToInt32(cell); + auto maybe_parsed_data = use_xdr_ ? BytesToInt32(cell) : BytesToInt32Win(cell); if (!maybe_parsed_data.ok()) { return maybe_parsed_data.status(); } @@ -177,7 +199,7 @@ Status PutCellValueInTensor(Tensor& tensor, size_t index, DataType cell_type, } break; case DT_INT64: { auto tensor_data = tensor.tensor(); - auto maybe_parsed_data = BytesToInt64(cell); + auto maybe_parsed_data = use_xdr_ ? BytesToInt64(cell) : BytesToInt64Win(cell); if (!maybe_parsed_data.ok()) { return maybe_parsed_data.status(); } @@ -185,7 +207,7 @@ Status PutCellValueInTensor(Tensor& tensor, size_t index, DataType cell_type, } break; case DT_FLOAT: { auto tensor_data = tensor.tensor(); - auto maybe_parsed_data = BytesToFloat(cell); + auto maybe_parsed_data = use_xdr_ ? BytesToFloat(cell) : BytesToFloatWin(cell); if (!maybe_parsed_data.ok()) { return maybe_parsed_data.status(); } @@ -193,7 +215,7 @@ Status PutCellValueInTensor(Tensor& tensor, size_t index, DataType cell_type, } break; case DT_DOUBLE: { auto tensor_data = tensor.tensor(); - auto maybe_parsed_data = BytesToDouble(cell); + auto maybe_parsed_data = use_xdr_ ? BytesToDouble(cell) : BytesToDoubleWin(cell); if (!maybe_parsed_data.ok()) { return maybe_parsed_data.status(); } diff --git a/tensorflow_io/core/kernels/bigtable/serialization.h b/tensorflow_io/core/kernels/bigtable/serialization.h index 962616eaa..922d1562d 100644 --- a/tensorflow_io/core/kernels/bigtable/serialization.h +++ b/tensorflow_io/core/kernels/bigtable/serialization.h @@ -16,19 +16,44 @@ limitations under the License. #ifndef SERIALIZATION_H #define SERIALIZATION_H +#include "tensorflow/core/platform/statusor.h" #include "google/cloud/bigtable/table.h" #include "tensorflow/core/framework/tensor.h" +#include "rpc/xdr.h" namespace tensorflow { namespace io { -// Bigtable only stores values as byte buffers - except for int64 the server -// side does not have any notion of types. Tensorflow, needs to store shorter -// integers, floats, doubles, so we needed to decide on how. We chose to follow -// what HBase does, since there is a path for migrating from HBase to Bigtable. -// XDR seems to match what HBase does. -Status PutCellValueInTensor(Tensor& tensor, size_t index, DataType cell_type, - google::cloud::bigtable::Cell const& cell); +class Serializer { + public: + + Serializer(){ + VLOG(1) << "checking env TFIO_DONT_USE_XDR"; + + const char* var = std::getenv("TFIO_DONT_USE_XDR"); + VLOG(1) << "got env TFIO_DONT_USE_XDR=" << var; + if(var && var[0] == '1'){ + VLOG(1) << "using custom implementation for serialization"; + use_xdr_ = false; + } else { + VLOG(1) << "using XDR for serialization"; + use_xdr_ = false; + } + } + + + // Bigtable only stores values as byte buffers - except for int64 the server + // side does not have any notion of types. Tensorflow, needs to store shorter + // integers, floats, doubles, so we needed to decide on how. We chose to follow + // what HBase does, since there is a path for migrating from HBase to Bigtable. + // XDR seems to match what HBase does. + Status PutCellValueInTensor(Tensor& tensor, size_t index, DataType cell_type, + google::cloud::bigtable::Cell const& cell) const; + + private: + bool use_xdr_; +}; + } // namespace io } // namespace tensorflow From 548ea9e9907c3b118f9371d7bdf0df794787649a Mon Sep 17 00:00:00 2001 From: Kajetan Boroszko Date: Fri, 26 Nov 2021 19:12:34 +0100 Subject: [PATCH 07/30] removed too much logging --- tensorflow_io/core/kernels/bigtable/serialization.h | 2 -- 1 file changed, 2 deletions(-) diff --git a/tensorflow_io/core/kernels/bigtable/serialization.h b/tensorflow_io/core/kernels/bigtable/serialization.h index 922d1562d..bdcb58444 100644 --- a/tensorflow_io/core/kernels/bigtable/serialization.h +++ b/tensorflow_io/core/kernels/bigtable/serialization.h @@ -28,8 +28,6 @@ class Serializer { public: Serializer(){ - VLOG(1) << "checking env TFIO_DONT_USE_XDR"; - const char* var = std::getenv("TFIO_DONT_USE_XDR"); VLOG(1) << "got env TFIO_DONT_USE_XDR=" << var; if(var && var[0] == '1'){ From 960d157a695a0ce6746836ca819090b82ec9c77a Mon Sep 17 00:00:00 2001 From: Kajetan Boroszko Date: Fri, 26 Nov 2021 19:42:02 +0100 Subject: [PATCH 08/30] use_xdr proper value --- tensorflow_io/core/kernels/bigtable/serialization.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tensorflow_io/core/kernels/bigtable/serialization.h b/tensorflow_io/core/kernels/bigtable/serialization.h index bdcb58444..5c57383f6 100644 --- a/tensorflow_io/core/kernels/bigtable/serialization.h +++ b/tensorflow_io/core/kernels/bigtable/serialization.h @@ -35,7 +35,7 @@ class Serializer { use_xdr_ = false; } else { VLOG(1) << "using XDR for serialization"; - use_xdr_ = false; + use_xdr_ = true; } } From 08692f0c208b7c054c3b3950041a1e7be7fc875b Mon Sep 17 00:00:00 2001 From: Kajetan Boroszko Date: Mon, 29 Nov 2021 14:30:22 +0100 Subject: [PATCH 09/30] added tests for serialization --- test.py | 14 -- tests/test_bigtable/test_serialization.py | 159 ++++++++++++++++++++++ 2 files changed, 159 insertions(+), 14 deletions(-) delete mode 100644 test.py create mode 100644 tests/test_bigtable/test_serialization.py diff --git a/test.py b/test.py deleted file mode 100644 index 17535e49a..000000000 --- a/test.py +++ /dev/null @@ -1,14 +0,0 @@ -import tensorflow_io as tfio -from tensorflow_io import bigtable as bt - -_=[print(x) for x in dir(bt)] - -c = bt.BigtableClient("test-project", "test-instance") -t = c.get_table("t1") -row_s = row_set.from_rows_or_ranges(row_range.closed_range("row000", "row009")) - -read_rows = [ - r for r in t.read_rows(["cf1:c1"], row_set=row_s) -] -print(read_rows) - diff --git a/tests/test_bigtable/test_serialization.py b/tests/test_bigtable/test_serialization.py new file mode 100644 index 000000000..790d7f36e --- /dev/null +++ b/tests/test_bigtable/test_serialization.py @@ -0,0 +1,159 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# disable module docstring for tests +# pylint: disable=C0114 +# disable class docstring for tests +# pylint: disable=C0115 + +import os +from .bigtable_emulator import BigtableEmulator +from tensorflow_io.python.ops.bigtable.bigtable_dataset_ops import BigtableClient +import tensorflow_io.python.ops.bigtable.bigtable_row_range as row_range +import tensorflow_io.python.ops.bigtable.bigtable_row_set as row_set +import tensorflow as tf +from tensorflow import test +from google.auth.credentials import AnonymousCredentials +from google.cloud.bigtable import Client +import datetime + + +class BigtableReadTest(test.TestCase): + def setUp(self): + self.emulator = BigtableEmulator() + self.data = { + 'values': [i*10/7 for i in range(10)], + 'float': [b'\x00\x00\x00\x00', b'?\xb6\xdbn', b'@6\xdbn', b'@\x89$\x92', b'@\xb6\xdbn', b'@\xe4\x92I', b'A\t$\x92', b'A \x00\x00', b'A6\xdbn', b'AM\xb6\xdb'], + 'double': [b'\x00\x00\x00\x00\x00\x00\x00\x00', b'?\xf6\xdbm\xb6\xdbm\xb7', b'@\x06\xdbm\xb6\xdbm\xb7', b'@\x11$\x92I$\x92I', b'@\x16\xdbm\xb6\xdbm\xb7', b'@\x1c\x92I$\x92I%', b'@!$\x92I$\x92I', b'@$\x00\x00\x00\x00\x00\x00', b'@&\xdbm\xb6\xdbm\xb7', b'@)\xb6\xdbm\xb6\xdbn'], + 'int32': [b'\x00\x00\x00\x00', b'\x00\x00\x00\x01', b'\x00\x00\x00\x02', b'\x00\x00\x00\x04', b'\x00\x00\x00\x05', b'\x00\x00\x00\x07', b'\x00\x00\x00\x08', b'\x00\x00\x00\n', b'\x00\x00\x00\x0b', b'\x00\x00\x00\x0c'], + 'int64': [b'\x00\x00\x00\x00\x00\x00\x00\x00', b'\x00\x00\x00\x00\x00\x00\x00\x01', b'\x00\x00\x00\x00\x00\x00\x00\x02', b'\x00\x00\x00\x00\x00\x00\x00\x04', b'\x00\x00\x00\x00\x00\x00\x00\x05', b'\x00\x00\x00\x00\x00\x00\x00\x07', b'\x00\x00\x00\x00\x00\x00\x00\x08', b'\x00\x00\x00\x00\x00\x00\x00\n', b'\x00\x00\x00\x00\x00\x00\x00\x0b', b'\x00\x00\x00\x00\x00\x00\x00\x0c'], + 'bool': [b'\x00\x00\x00\x00', b'\x00\x00\x00\x01', b'\x00\x00\x00\x01', b'\x00\x00\x00\x01', b'\x00\x00\x00\x01', b'\x00\x00\x00\x01', b'\x00\x00\x00\x01', b'\x00\x00\x00\x01', b'\x00\x00\x00\x01', b'\x00\x00\x00\x01'], + } + + os.environ["BIGTABLE_EMULATOR_HOST"] = self.emulator.get_addr() + self.emulator.create_table( + "fake_project", "fake_instance", "test-table", ["fam1"] + ) + + client = Client(project='fake_project', + credentials=AnonymousCredentials(), + admin=True) + table = client.instance("fake_instance").table("test-table") + + for type_name in ['float', 'double', 'int32', 'int64', 'bool']: + rows = [] + for i,value in enumerate(self.data[type_name]): + row_key = "row" + str(i).rjust(3,"0") + row = table.direct_row(row_key) + row.set_cell("fam1", + type_name, + value, + timestamp=datetime.datetime.utcnow()) + rows.append(row) + table.mutate_rows(rows) + + def tearDown(self): + self.emulator.stop() + + def test_float(self): + TYPE_NAME = "float" + TF_DTYPE = tf.float32 + + values = tf.constant(self.data['values'], dtype=TF_DTYPE) + + client = BigtableClient("fake_project", "fake_instance") + table = client.get_table("test-table") + + for i, r in enumerate( + table.read_rows( + ["fam1:" +TYPE_NAME], + row_set=row_set.from_rows_or_ranges(row_range.infinite()), + output_type=TF_DTYPE + ) + ): + self.assertEqual(values[i].numpy(), r.numpy()[0]) + + def test_double(self): + TYPE_NAME = "double" + TF_DTYPE = tf.float64 + + values = tf.constant(self.data['values'], dtype=TF_DTYPE) + + client = BigtableClient("fake_project", "fake_instance") + table = client.get_table("test-table") + + for i, r in enumerate( + table.read_rows( + ["fam1:" +TYPE_NAME], + row_set=row_set.from_rows_or_ranges(row_range.infinite()), + output_type=TF_DTYPE + ) + ): + self.assertEqual(values[i].numpy(), r.numpy()[0]) + + def test_int64(self): + TYPE_NAME = "int64" + TF_DTYPE = tf.int64 + + values = tf.cast(tf.constant(self.data['values']), dtype=TF_DTYPE) + + client = BigtableClient("fake_project", "fake_instance") + table = client.get_table("test-table") + + for i, r in enumerate( + table.read_rows( + ["fam1:" +TYPE_NAME], + row_set=row_set.from_rows_or_ranges(row_range.infinite()), + output_type=TF_DTYPE + ) + ): + self.assertEqual(values[i].numpy(), r.numpy()[0]) + + + def test_int32(self): + TYPE_NAME = "int32" + TF_DTYPE = tf.int32 + + values = tf.cast(tf.constant(self.data['values']), dtype=TF_DTYPE) + + client = BigtableClient("fake_project", "fake_instance") + table = client.get_table("test-table") + + for i, r in enumerate( + table.read_rows( + ["fam1:" +TYPE_NAME], + row_set=row_set.from_rows_or_ranges(row_range.infinite()), + output_type=TF_DTYPE + ) + ): + self.assertEqual(values[i].numpy(), r.numpy()[0]) + + def test_bool(self): + TYPE_NAME = "bool" + TF_DTYPE = tf.bool + + values = tf.cast(tf.constant(self.data['values']), dtype=TF_DTYPE) + + client = BigtableClient("fake_project", "fake_instance") + table = client.get_table("test-table") + + for i, r in enumerate( + table.read_rows( + ["fam1:" +TYPE_NAME], + row_set=row_set.from_rows_or_ranges(row_range.infinite()), + output_type=TF_DTYPE + ) + ): + self.assertEqual(values[i].numpy(), r.numpy()[0]) + From 2a9bb12464e8ad6921da24d629f54da116f620f3 Mon Sep 17 00:00:00 2001 From: Kajetan Boroszko Date: Mon, 29 Nov 2021 15:04:56 +0100 Subject: [PATCH 10/30] different bool byte representation --- .../core/kernels/bigtable/serialization.cc | 26 +-- tests/test_bigtable/test_serialization.py | 155 +++++++++++------- 2 files changed, 105 insertions(+), 76 deletions(-) diff --git a/tensorflow_io/core/kernels/bigtable/serialization.cc b/tensorflow_io/core/kernels/bigtable/serialization.cc index a9beb2a15..1c9cd5986 100644 --- a/tensorflow_io/core/kernels/bigtable/serialization.cc +++ b/tensorflow_io/core/kernels/bigtable/serialization.cc @@ -111,19 +111,6 @@ inline StatusOr BytesToBool(const cbt::Cell & cell) { #endif // _WIN32 -inline StatusOr BytesToBoolWin(const cbt::Cell & cell) { - std::string const& bytes = cell.value(); - union { - char byte; - bool res; - } u; - if (bytes.size() != 1U) { - return errors::InvalidArgument("Invalid bool representation."); - } - u.byte = bytes[0]; - return u.res; -} - inline StatusOr BytesToInt32Win(const cbt::Cell & cell) { std::string const& bytes = cell.value(); union { @@ -145,6 +132,19 @@ inline StatusOr BytesToInt64Win(const cbt::Cell & cell) { return maybe_value.value(); } +inline StatusOr BytesToBoolWin(const cbt::Cell & cell) { + auto const int_rep = BytesToInt32Win(cell); + if (!int_rep.ok()) { + return int_rep; + } + union { + bool res; + int32_t int_rep; + } u; + u.int_rep = *int_rep; + return u.res; +} + inline StatusOr BytesToFloatWin(const cbt::Cell & cell) { auto const int_rep = BytesToInt32Win(cell); if (!int_rep.ok()) { diff --git a/tests/test_bigtable/test_serialization.py b/tests/test_bigtable/test_serialization.py index 790d7f36e..c353c1dd3 100644 --- a/tests/test_bigtable/test_serialization.py +++ b/tests/test_bigtable/test_serialization.py @@ -28,6 +28,18 @@ from google.cloud.bigtable import Client import datetime +USE_XDR_ENV_VAR_NAME = "TFIO_DONT_USE_XDR" + + +def check_values(test_case, values, table, type_name, tf_dtype): + for i, r in enumerate( + table.read_rows( + ["fam1:" +type_name], + row_set=row_set.from_rows_or_ranges(row_range.infinite()), + output_type=tf_dtype + ) + ): + test_case.assertEqual(values[i].numpy(), r.numpy()[0]) class BigtableReadTest(test.TestCase): def setUp(self): @@ -66,94 +78,111 @@ def setUp(self): def tearDown(self): self.emulator.stop() - def test_float(self): - TYPE_NAME = "float" - TF_DTYPE = tf.float32 + def test_float_xdr(self): + if USE_XDR_ENV_VAR_NAME in os.environ: + del os.environ[USE_XDR_ENV_VAR_NAME] + + values = tf.constant(self.data['values'], dtype=tf.float32) + + client = BigtableClient("fake_project", "fake_instance") + table = client.get_table("test-table") + + check_values(self, values, table, "float", tf.float32) + + + def test_float_win(self): + os.environ[USE_XDR_ENV_VAR_NAME] = '1' + + values = tf.constant(self.data['values'], dtype=tf.float32) + + client = BigtableClient("fake_project", "fake_instance") + table = client.get_table("test-table") + + check_values(self, values, table, "float", tf.float32) + + def test_double_xdr(self): + if USE_XDR_ENV_VAR_NAME in os.environ: + del os.environ[USE_XDR_ENV_VAR_NAME] + + values = tf.constant(self.data['values'], dtype=tf.float64) + + client = BigtableClient("fake_project", "fake_instance") + table = client.get_table("test-table") + + check_values(self, values, table, "double", tf.float64) + - values = tf.constant(self.data['values'], dtype=TF_DTYPE) + def test_double_win(self): + os.environ[USE_XDR_ENV_VAR_NAME] = '1' + values = tf.constant(self.data['values'], dtype=tf.float64) client = BigtableClient("fake_project", "fake_instance") table = client.get_table("test-table") - for i, r in enumerate( - table.read_rows( - ["fam1:" +TYPE_NAME], - row_set=row_set.from_rows_or_ranges(row_range.infinite()), - output_type=TF_DTYPE - ) - ): - self.assertEqual(values[i].numpy(), r.numpy()[0]) + check_values(self, values, table, "double", tf.float64) + + + def test_int64_xdr(self): + if USE_XDR_ENV_VAR_NAME in os.environ: + del os.environ[USE_XDR_ENV_VAR_NAME] + + values = tf.cast(tf.constant(self.data['values']), dtype=tf.int64) - def test_double(self): - TYPE_NAME = "double" - TF_DTYPE = tf.float64 + client = BigtableClient("fake_project", "fake_instance") + table = client.get_table("test-table") + + check_values(self, values, table, "int64", tf.int64) + + def test_int64_win(self): + os.environ[USE_XDR_ENV_VAR_NAME] = '1' - values = tf.constant(self.data['values'], dtype=TF_DTYPE) + values = tf.cast(tf.constant(self.data['values']), dtype=tf.int64) client = BigtableClient("fake_project", "fake_instance") table = client.get_table("test-table") - for i, r in enumerate( - table.read_rows( - ["fam1:" +TYPE_NAME], - row_set=row_set.from_rows_or_ranges(row_range.infinite()), - output_type=TF_DTYPE - ) - ): - self.assertEqual(values[i].numpy(), r.numpy()[0]) + check_values(self, values, table, "int64", tf.int64) - def test_int64(self): - TYPE_NAME = "int64" - TF_DTYPE = tf.int64 + def test_int32_xdr(self): + if USE_XDR_ENV_VAR_NAME in os.environ: + del os.environ[USE_XDR_ENV_VAR_NAME] - values = tf.cast(tf.constant(self.data['values']), dtype=TF_DTYPE) + values = tf.cast(tf.constant(self.data['values']), dtype=tf.int32) client = BigtableClient("fake_project", "fake_instance") table = client.get_table("test-table") - for i, r in enumerate( - table.read_rows( - ["fam1:" +TYPE_NAME], - row_set=row_set.from_rows_or_ranges(row_range.infinite()), - output_type=TF_DTYPE - ) - ): - self.assertEqual(values[i].numpy(), r.numpy()[0]) + check_values(self, values, table, "int32", tf.int32) + def test_int32_win(self): + os.environ[USE_XDR_ENV_VAR_NAME] = '1' - def test_int32(self): - TYPE_NAME = "int32" - TF_DTYPE = tf.int32 - - values = tf.cast(tf.constant(self.data['values']), dtype=TF_DTYPE) + values = tf.cast(tf.constant(self.data['values']), dtype=tf.int32) client = BigtableClient("fake_project", "fake_instance") table = client.get_table("test-table") - for i, r in enumerate( - table.read_rows( - ["fam1:" +TYPE_NAME], - row_set=row_set.from_rows_or_ranges(row_range.infinite()), - output_type=TF_DTYPE - ) - ): - self.assertEqual(values[i].numpy(), r.numpy()[0]) - - def test_bool(self): - TYPE_NAME = "bool" - TF_DTYPE = tf.bool - - values = tf.cast(tf.constant(self.data['values']), dtype=TF_DTYPE) + check_values(self, values, table, "int32", tf.int32) + + + def test_bool_xdr(self): + if USE_XDR_ENV_VAR_NAME in os.environ: + del os.environ[USE_XDR_ENV_VAR_NAME] + + values = tf.cast(tf.constant(self.data['values']), dtype=tf.bool) client = BigtableClient("fake_project", "fake_instance") table = client.get_table("test-table") - for i, r in enumerate( - table.read_rows( - ["fam1:" +TYPE_NAME], - row_set=row_set.from_rows_or_ranges(row_range.infinite()), - output_type=TF_DTYPE - ) - ): - self.assertEqual(values[i].numpy(), r.numpy()[0]) + check_values(self, values, table, "bool", tf.bool) + + + def test_bool_win(self): + os.environ[USE_XDR_ENV_VAR_NAME] = '1' + + values = tf.cast(tf.constant(self.data['values']), dtype=tf.bool) + + client = BigtableClient("fake_project", "fake_instance") + table = client.get_table("test-table") + check_values(self, values, table, "bool", tf.bool) From 078b218bd4735be423209b0ef3cc357884489b3d Mon Sep 17 00:00:00 2001 From: Kajetan Boroszko Date: Mon, 29 Nov 2021 15:31:38 +0100 Subject: [PATCH 11/30] linting --- .../bigtable/bigtable_dataset_kernel.cc | 4 +- .../core/kernels/bigtable/serialization.cc | 60 ++++---- .../core/kernels/bigtable/serialization.h | 51 ++++--- tests/test_bigtable/test_serialization.py | 130 ++++++++++++------ 4 files changed, 147 insertions(+), 98 deletions(-) diff --git a/tensorflow_io/core/kernels/bigtable/bigtable_dataset_kernel.cc b/tensorflow_io/core/kernels/bigtable/bigtable_dataset_kernel.cc index 002cc4339..c02238590 100644 --- a/tensorflow_io/core/kernels/bigtable/bigtable_dataset_kernel.cc +++ b/tensorflow_io/core/kernels/bigtable/bigtable_dataset_kernel.cc @@ -185,8 +185,8 @@ class Iterator : public DatasetIterator { const auto column_idx = column_to_idx_.find(key); if (column_idx != column_to_idx_.end()) { VLOG(1) << "getting column:" << column_idx->second; - TF_RETURN_IF_ERROR( - serializer_.PutCellValueInTensor(res, column_idx->second, dtype, cell)); + TF_RETURN_IF_ERROR(serializer_.PutCellValueInTensor( + res, column_idx->second, dtype, cell)); } else { LOG(ERROR) << "column " << cell.family_name() << ":" << cell.column_qualifier() diff --git a/tensorflow_io/core/kernels/bigtable/serialization.cc b/tensorflow_io/core/kernels/bigtable/serialization.cc index 1c9cd5986..b37281735 100644 --- a/tensorflow_io/core/kernels/bigtable/serialization.cc +++ b/tensorflow_io/core/kernels/bigtable/serialization.cc @@ -15,6 +15,7 @@ limitations under the License. */ #include "tensorflow_io/core/kernels/bigtable/serialization.h" + #include "tensorflow/core/platform/errors.h" #include "tensorflow/core/platform/statusor.h" @@ -24,37 +25,34 @@ namespace tensorflow { namespace io { namespace { - #ifdef _WIN32 #include - -inline StatusOr BytesToFloat(const cbt::Cell & cell) { +inline StatusOr BytesToFloat(const cbt::Cell& cell) { return errors::Unimplemented("Use BytesToFloatWin instead."); } -inline StatusOr BytesToDouble(const cbt::Cell & cell) { +inline StatusOr BytesToDouble(const cbt::Cell& cell) { return errors::Unimplemented("Use BytesToDoubleWin instead."); } -inline StatusOr BytesToInt64(const cbt::Cell & cell) { +inline StatusOr BytesToInt64(const cbt::Cell& cell) { return errors::Unimplemented("Use BytesToInt64Win instead."); } -inline StatusOr BytesToInt32(const cbt::Cell & cell) { +inline StatusOr BytesToInt32(const cbt::Cell& cell) { return errors::Unimplemented("Use BytesToInt32Win instead."); } -inline StatusOr BytesToBool(const cbt::Cell & cell) { +inline StatusOr BytesToBool(const cbt::Cell& cell) { return errors::Unimplemented("Use BytesToBoolWin instead."); } #else // _WIN32 #include - -inline StatusOr BytesToFloat(const cbt::Cell & cell) { +inline StatusOr BytesToFloat(const cbt::Cell& cell) { std::string const& s = cell.value(); float v; XDR xdrs; @@ -65,7 +63,7 @@ inline StatusOr BytesToFloat(const cbt::Cell & cell) { return v; } -inline StatusOr BytesToDouble(const cbt::Cell & cell) { +inline StatusOr BytesToDouble(const cbt::Cell& cell) { std::string const& s = cell.value(); double v; XDR xdrs; @@ -76,7 +74,7 @@ inline StatusOr BytesToDouble(const cbt::Cell & cell) { return v; } -inline StatusOr BytesToInt64(const cbt::Cell & cell) { +inline StatusOr BytesToInt64(const cbt::Cell& cell) { std::string const& s = cell.value(); int64_t v; XDR xdrs; @@ -87,7 +85,7 @@ inline StatusOr BytesToInt64(const cbt::Cell & cell) { return v; } -inline StatusOr BytesToInt32(const cbt::Cell & cell) { +inline StatusOr BytesToInt32(const cbt::Cell& cell) { std::string const& s = cell.value(); int32_t v; XDR xdrs; @@ -98,7 +96,7 @@ inline StatusOr BytesToInt32(const cbt::Cell & cell) { return v; } -inline StatusOr BytesToBool(const cbt::Cell & cell) { +inline StatusOr BytesToBool(const cbt::Cell& cell) { std::string const& s = cell.value(); bool_t v; XDR xdrs; @@ -111,7 +109,7 @@ inline StatusOr BytesToBool(const cbt::Cell & cell) { #endif // _WIN32 -inline StatusOr BytesToInt32Win(const cbt::Cell & cell) { +inline StatusOr BytesToInt32Win(const cbt::Cell& cell) { std::string const& bytes = cell.value(); union { char bytes[4]; @@ -124,15 +122,15 @@ inline StatusOr BytesToInt32Win(const cbt::Cell & cell) { return ntohl(u.res); } -inline StatusOr BytesToInt64Win(const cbt::Cell & cell) { +inline StatusOr BytesToInt64Win(const cbt::Cell& cell) { auto maybe_value = cell.decode_big_endian_integer(); - if(!maybe_value.ok()){ - return errors::InvalidArgument("Invalid int32 representation."); + if (!maybe_value.ok()) { + return errors::InvalidArgument("Invalid int32 representation."); } return maybe_value.value(); } -inline StatusOr BytesToBoolWin(const cbt::Cell & cell) { +inline StatusOr BytesToBoolWin(const cbt::Cell& cell) { auto const int_rep = BytesToInt32Win(cell); if (!int_rep.ok()) { return int_rep; @@ -145,7 +143,7 @@ inline StatusOr BytesToBoolWin(const cbt::Cell & cell) { return u.res; } -inline StatusOr BytesToFloatWin(const cbt::Cell & cell) { +inline StatusOr BytesToFloatWin(const cbt::Cell& cell) { auto const int_rep = BytesToInt32Win(cell); if (!int_rep.ok()) { return int_rep; @@ -158,7 +156,7 @@ inline StatusOr BytesToFloatWin(const cbt::Cell & cell) { return u.res; } -inline StatusOr BytesToDoubleWin(const cbt::Cell & cell) { +inline StatusOr BytesToDoubleWin(const cbt::Cell& cell) { auto const int_rep = BytesToInt64Win(cell); if (!int_rep.ok()) { return int_rep; @@ -171,11 +169,10 @@ inline StatusOr BytesToDoubleWin(const cbt::Cell & cell) { return u.res; } - } // namespace -Status Serializer::PutCellValueInTensor(Tensor& tensor, size_t index, DataType cell_type, - google::cloud::bigtable::Cell const& cell) const { - +Status Serializer::PutCellValueInTensor( + Tensor& tensor, size_t index, DataType cell_type, + google::cloud::bigtable::Cell const& cell) const { switch (cell_type) { case DT_STRING: { auto tensor_data = tensor.tensor(); @@ -183,7 +180,8 @@ Status Serializer::PutCellValueInTensor(Tensor& tensor, size_t index, DataType c } break; case DT_BOOL: { auto tensor_data = tensor.tensor(); - auto maybe_parsed_data = use_xdr_ ? BytesToBool(cell) : BytesToBoolWin(cell); + auto maybe_parsed_data = + use_xdr_ ? BytesToBool(cell) : BytesToBoolWin(cell); if (!maybe_parsed_data.ok()) { return maybe_parsed_data.status(); } @@ -191,7 +189,8 @@ Status Serializer::PutCellValueInTensor(Tensor& tensor, size_t index, DataType c } break; case DT_INT32: { auto tensor_data = tensor.tensor(); - auto maybe_parsed_data = use_xdr_ ? BytesToInt32(cell) : BytesToInt32Win(cell); + auto maybe_parsed_data = + use_xdr_ ? BytesToInt32(cell) : BytesToInt32Win(cell); if (!maybe_parsed_data.ok()) { return maybe_parsed_data.status(); } @@ -199,7 +198,8 @@ Status Serializer::PutCellValueInTensor(Tensor& tensor, size_t index, DataType c } break; case DT_INT64: { auto tensor_data = tensor.tensor(); - auto maybe_parsed_data = use_xdr_ ? BytesToInt64(cell) : BytesToInt64Win(cell); + auto maybe_parsed_data = + use_xdr_ ? BytesToInt64(cell) : BytesToInt64Win(cell); if (!maybe_parsed_data.ok()) { return maybe_parsed_data.status(); } @@ -207,7 +207,8 @@ Status Serializer::PutCellValueInTensor(Tensor& tensor, size_t index, DataType c } break; case DT_FLOAT: { auto tensor_data = tensor.tensor(); - auto maybe_parsed_data = use_xdr_ ? BytesToFloat(cell) : BytesToFloatWin(cell); + auto maybe_parsed_data = + use_xdr_ ? BytesToFloat(cell) : BytesToFloatWin(cell); if (!maybe_parsed_data.ok()) { return maybe_parsed_data.status(); } @@ -215,7 +216,8 @@ Status Serializer::PutCellValueInTensor(Tensor& tensor, size_t index, DataType c } break; case DT_DOUBLE: { auto tensor_data = tensor.tensor(); - auto maybe_parsed_data = use_xdr_ ? BytesToDouble(cell) : BytesToDoubleWin(cell); + auto maybe_parsed_data = + use_xdr_ ? BytesToDouble(cell) : BytesToDoubleWin(cell); if (!maybe_parsed_data.ok()) { return maybe_parsed_data.status(); } diff --git a/tensorflow_io/core/kernels/bigtable/serialization.h b/tensorflow_io/core/kernels/bigtable/serialization.h index 5c57383f6..7824c016e 100644 --- a/tensorflow_io/core/kernels/bigtable/serialization.h +++ b/tensorflow_io/core/kernels/bigtable/serialization.h @@ -16,43 +16,40 @@ limitations under the License. #ifndef SERIALIZATION_H #define SERIALIZATION_H -#include "tensorflow/core/platform/statusor.h" #include "google/cloud/bigtable/table.h" -#include "tensorflow/core/framework/tensor.h" #include "rpc/xdr.h" +#include "tensorflow/core/framework/tensor.h" +#include "tensorflow/core/platform/statusor.h" namespace tensorflow { namespace io { class Serializer { - public: - - Serializer(){ - const char* var = std::getenv("TFIO_DONT_USE_XDR"); - VLOG(1) << "got env TFIO_DONT_USE_XDR=" << var; - if(var && var[0] == '1'){ - VLOG(1) << "using custom implementation for serialization"; - use_xdr_ = false; - } else { - VLOG(1) << "using XDR for serialization"; - use_xdr_ = true; - } + public: + Serializer() { + const char* var = std::getenv("TFIO_DONT_USE_XDR"); + VLOG(1) << "got env TFIO_DONT_USE_XDR=" << var; + if (var && var[0] == '1') { + VLOG(1) << "using custom implementation for serialization"; + use_xdr_ = false; + } else { + VLOG(1) << "using XDR for serialization"; + use_xdr_ = true; } - - - // Bigtable only stores values as byte buffers - except for int64 the server - // side does not have any notion of types. Tensorflow, needs to store shorter - // integers, floats, doubles, so we needed to decide on how. We chose to follow - // what HBase does, since there is a path for migrating from HBase to Bigtable. - // XDR seems to match what HBase does. - Status PutCellValueInTensor(Tensor& tensor, size_t index, DataType cell_type, - google::cloud::bigtable::Cell const& cell) const; - - private: - bool use_xdr_; + } + + // Bigtable only stores values as byte buffers - except for int64 the server + // side does not have any notion of types. Tensorflow, needs to store shorter + // integers, floats, doubles, so we needed to decide on how. We chose to + // follow what HBase does, since there is a path for migrating from HBase to + // Bigtable. XDR seems to match what HBase does. + Status PutCellValueInTensor(Tensor& tensor, size_t index, DataType cell_type, + google::cloud::bigtable::Cell const& cell) const; + + private: + bool use_xdr_; }; - } // namespace io } // namespace tensorflow diff --git a/tests/test_bigtable/test_serialization.py b/tests/test_bigtable/test_serialization.py index c353c1dd3..d4245e6a8 100644 --- a/tests/test_bigtable/test_serialization.py +++ b/tests/test_bigtable/test_serialization.py @@ -34,23 +34,79 @@ def check_values(test_case, values, table, type_name, tf_dtype): for i, r in enumerate( table.read_rows( - ["fam1:" +type_name], + ["fam1:" + type_name], row_set=row_set.from_rows_or_ranges(row_range.infinite()), - output_type=tf_dtype + output_type=tf_dtype, ) ): test_case.assertEqual(values[i].numpy(), r.numpy()[0]) + class BigtableReadTest(test.TestCase): def setUp(self): self.emulator = BigtableEmulator() self.data = { - 'values': [i*10/7 for i in range(10)], - 'float': [b'\x00\x00\x00\x00', b'?\xb6\xdbn', b'@6\xdbn', b'@\x89$\x92', b'@\xb6\xdbn', b'@\xe4\x92I', b'A\t$\x92', b'A \x00\x00', b'A6\xdbn', b'AM\xb6\xdb'], - 'double': [b'\x00\x00\x00\x00\x00\x00\x00\x00', b'?\xf6\xdbm\xb6\xdbm\xb7', b'@\x06\xdbm\xb6\xdbm\xb7', b'@\x11$\x92I$\x92I', b'@\x16\xdbm\xb6\xdbm\xb7', b'@\x1c\x92I$\x92I%', b'@!$\x92I$\x92I', b'@$\x00\x00\x00\x00\x00\x00', b'@&\xdbm\xb6\xdbm\xb7', b'@)\xb6\xdbm\xb6\xdbn'], - 'int32': [b'\x00\x00\x00\x00', b'\x00\x00\x00\x01', b'\x00\x00\x00\x02', b'\x00\x00\x00\x04', b'\x00\x00\x00\x05', b'\x00\x00\x00\x07', b'\x00\x00\x00\x08', b'\x00\x00\x00\n', b'\x00\x00\x00\x0b', b'\x00\x00\x00\x0c'], - 'int64': [b'\x00\x00\x00\x00\x00\x00\x00\x00', b'\x00\x00\x00\x00\x00\x00\x00\x01', b'\x00\x00\x00\x00\x00\x00\x00\x02', b'\x00\x00\x00\x00\x00\x00\x00\x04', b'\x00\x00\x00\x00\x00\x00\x00\x05', b'\x00\x00\x00\x00\x00\x00\x00\x07', b'\x00\x00\x00\x00\x00\x00\x00\x08', b'\x00\x00\x00\x00\x00\x00\x00\n', b'\x00\x00\x00\x00\x00\x00\x00\x0b', b'\x00\x00\x00\x00\x00\x00\x00\x0c'], - 'bool': [b'\x00\x00\x00\x00', b'\x00\x00\x00\x01', b'\x00\x00\x00\x01', b'\x00\x00\x00\x01', b'\x00\x00\x00\x01', b'\x00\x00\x00\x01', b'\x00\x00\x00\x01', b'\x00\x00\x00\x01', b'\x00\x00\x00\x01', b'\x00\x00\x00\x01'], + "values": [i * 10 / 7 for i in range(10)], + "float": [ + b"\x00\x00\x00\x00", + b"?\xb6\xdbn", + b"@6\xdbn", + b"@\x89$\x92", + b"@\xb6\xdbn", + b"@\xe4\x92I", + b"A\t$\x92", + b"A \x00\x00", + b"A6\xdbn", + b"AM\xb6\xdb", + ], + "double": [ + b"\x00\x00\x00\x00\x00\x00\x00\x00", + b"?\xf6\xdbm\xb6\xdbm\xb7", + b"@\x06\xdbm\xb6\xdbm\xb7", + b"@\x11$\x92I$\x92I", + b"@\x16\xdbm\xb6\xdbm\xb7", + b"@\x1c\x92I$\x92I%", + b"@!$\x92I$\x92I", + b"@$\x00\x00\x00\x00\x00\x00", + b"@&\xdbm\xb6\xdbm\xb7", + b"@)\xb6\xdbm\xb6\xdbn", + ], + "int32": [ + b"\x00\x00\x00\x00", + b"\x00\x00\x00\x01", + b"\x00\x00\x00\x02", + b"\x00\x00\x00\x04", + b"\x00\x00\x00\x05", + b"\x00\x00\x00\x07", + b"\x00\x00\x00\x08", + b"\x00\x00\x00\n", + b"\x00\x00\x00\x0b", + b"\x00\x00\x00\x0c", + ], + "int64": [ + b"\x00\x00\x00\x00\x00\x00\x00\x00", + b"\x00\x00\x00\x00\x00\x00\x00\x01", + b"\x00\x00\x00\x00\x00\x00\x00\x02", + b"\x00\x00\x00\x00\x00\x00\x00\x04", + b"\x00\x00\x00\x00\x00\x00\x00\x05", + b"\x00\x00\x00\x00\x00\x00\x00\x07", + b"\x00\x00\x00\x00\x00\x00\x00\x08", + b"\x00\x00\x00\x00\x00\x00\x00\n", + b"\x00\x00\x00\x00\x00\x00\x00\x0b", + b"\x00\x00\x00\x00\x00\x00\x00\x0c", + ], + "bool": [ + b"\x00\x00\x00\x00", + b"\x00\x00\x00\x01", + b"\x00\x00\x00\x01", + b"\x00\x00\x00\x01", + b"\x00\x00\x00\x01", + b"\x00\x00\x00\x01", + b"\x00\x00\x00\x01", + b"\x00\x00\x00\x01", + b"\x00\x00\x00\x01", + b"\x00\x00\x00\x01", + ], } os.environ["BIGTABLE_EMULATOR_HOST"] = self.emulator.get_addr() @@ -58,20 +114,19 @@ def setUp(self): "fake_project", "fake_instance", "test-table", ["fam1"] ) - client = Client(project='fake_project', - credentials=AnonymousCredentials(), - admin=True) + client = Client( + project="fake_project", credentials=AnonymousCredentials(), admin=True + ) table = client.instance("fake_instance").table("test-table") - for type_name in ['float', 'double', 'int32', 'int64', 'bool']: + for type_name in ["float", "double", "int32", "int64", "bool"]: rows = [] - for i,value in enumerate(self.data[type_name]): - row_key = "row" + str(i).rjust(3,"0") + for i, value in enumerate(self.data[type_name]): + row_key = "row" + str(i).rjust(3, "0") row = table.direct_row(row_key) - row.set_cell("fam1", - type_name, - value, - timestamp=datetime.datetime.utcnow()) + row.set_cell( + "fam1", type_name, value, timestamp=datetime.datetime.utcnow() + ) rows.append(row) table.mutate_rows(rows) @@ -82,18 +137,17 @@ def test_float_xdr(self): if USE_XDR_ENV_VAR_NAME in os.environ: del os.environ[USE_XDR_ENV_VAR_NAME] - values = tf.constant(self.data['values'], dtype=tf.float32) + values = tf.constant(self.data["values"], dtype=tf.float32) client = BigtableClient("fake_project", "fake_instance") table = client.get_table("test-table") check_values(self, values, table, "float", tf.float32) - def test_float_win(self): - os.environ[USE_XDR_ENV_VAR_NAME] = '1' + os.environ[USE_XDR_ENV_VAR_NAME] = "1" - values = tf.constant(self.data['values'], dtype=tf.float32) + values = tf.constant(self.data["values"], dtype=tf.float32) client = BigtableClient("fake_project", "fake_instance") table = client.get_table("test-table") @@ -104,39 +158,37 @@ def test_double_xdr(self): if USE_XDR_ENV_VAR_NAME in os.environ: del os.environ[USE_XDR_ENV_VAR_NAME] - values = tf.constant(self.data['values'], dtype=tf.float64) + values = tf.constant(self.data["values"], dtype=tf.float64) client = BigtableClient("fake_project", "fake_instance") table = client.get_table("test-table") check_values(self, values, table, "double", tf.float64) - def test_double_win(self): - os.environ[USE_XDR_ENV_VAR_NAME] = '1' - values = tf.constant(self.data['values'], dtype=tf.float64) + os.environ[USE_XDR_ENV_VAR_NAME] = "1" + values = tf.constant(self.data["values"], dtype=tf.float64) client = BigtableClient("fake_project", "fake_instance") table = client.get_table("test-table") check_values(self, values, table, "double", tf.float64) - def test_int64_xdr(self): if USE_XDR_ENV_VAR_NAME in os.environ: del os.environ[USE_XDR_ENV_VAR_NAME] - values = tf.cast(tf.constant(self.data['values']), dtype=tf.int64) + values = tf.cast(tf.constant(self.data["values"]), dtype=tf.int64) client = BigtableClient("fake_project", "fake_instance") table = client.get_table("test-table") check_values(self, values, table, "int64", tf.int64) - + def test_int64_win(self): - os.environ[USE_XDR_ENV_VAR_NAME] = '1' + os.environ[USE_XDR_ENV_VAR_NAME] = "1" - values = tf.cast(tf.constant(self.data['values']), dtype=tf.int64) + values = tf.cast(tf.constant(self.data["values"]), dtype=tf.int64) client = BigtableClient("fake_project", "fake_instance") table = client.get_table("test-table") @@ -147,40 +199,38 @@ def test_int32_xdr(self): if USE_XDR_ENV_VAR_NAME in os.environ: del os.environ[USE_XDR_ENV_VAR_NAME] - values = tf.cast(tf.constant(self.data['values']), dtype=tf.int32) + values = tf.cast(tf.constant(self.data["values"]), dtype=tf.int32) client = BigtableClient("fake_project", "fake_instance") table = client.get_table("test-table") check_values(self, values, table, "int32", tf.int32) - + def test_int32_win(self): - os.environ[USE_XDR_ENV_VAR_NAME] = '1' + os.environ[USE_XDR_ENV_VAR_NAME] = "1" - values = tf.cast(tf.constant(self.data['values']), dtype=tf.int32) + values = tf.cast(tf.constant(self.data["values"]), dtype=tf.int32) client = BigtableClient("fake_project", "fake_instance") table = client.get_table("test-table") check_values(self, values, table, "int32", tf.int32) - def test_bool_xdr(self): if USE_XDR_ENV_VAR_NAME in os.environ: del os.environ[USE_XDR_ENV_VAR_NAME] - values = tf.cast(tf.constant(self.data['values']), dtype=tf.bool) + values = tf.cast(tf.constant(self.data["values"]), dtype=tf.bool) client = BigtableClient("fake_project", "fake_instance") table = client.get_table("test-table") check_values(self, values, table, "bool", tf.bool) - def test_bool_win(self): - os.environ[USE_XDR_ENV_VAR_NAME] = '1' + os.environ[USE_XDR_ENV_VAR_NAME] = "1" - values = tf.cast(tf.constant(self.data['values']), dtype=tf.bool) + values = tf.cast(tf.constant(self.data["values"]), dtype=tf.bool) client = BigtableClient("fake_project", "fake_instance") table = client.get_table("test-table") From 52c567919d6cd51826ba091745a0448768ce2045 Mon Sep 17 00:00:00 2001 From: Kajetan Boroszko Date: Mon, 29 Nov 2021 16:46:30 +0100 Subject: [PATCH 12/30] refactored code to two subclasses --- .../bigtable/bigtable_dataset_kernel.cc | 5 +- .../core/kernels/bigtable/serialization.cc | 123 +++++++++++++----- .../core/kernels/bigtable/serialization.h | 28 ++-- 3 files changed, 108 insertions(+), 48 deletions(-) diff --git a/tensorflow_io/core/kernels/bigtable/bigtable_dataset_kernel.cc b/tensorflow_io/core/kernels/bigtable/bigtable_dataset_kernel.cc index c02238590..8cef560f3 100644 --- a/tensorflow_io/core/kernels/bigtable/bigtable_dataset_kernel.cc +++ b/tensorflow_io/core/kernels/bigtable/bigtable_dataset_kernel.cc @@ -146,6 +146,7 @@ class Iterator : public DatasetIterator { explicit Iterator(const typename DatasetIterator::Params& params, const std::vector& columns) : DatasetIterator(params), + serializer_(io::GetSerializer()), columns_(ColumnsToFamiliesAndQualifiers(columns)), reader_(this->dataset()->CreateTable().ReadRows( this->dataset()->row_set(), @@ -185,7 +186,7 @@ class Iterator : public DatasetIterator { const auto column_idx = column_to_idx_.find(key); if (column_idx != column_to_idx_.end()) { VLOG(1) << "getting column:" << column_idx->second; - TF_RETURN_IF_ERROR(serializer_.PutCellValueInTensor( + TF_RETURN_IF_ERROR(serializer_->PutCellValueInTensor( res, column_idx->second, dtype, cell)); } else { LOG(ERROR) << "column " << cell.family_name() << ":" @@ -267,6 +268,7 @@ class Iterator : public DatasetIterator { } mutex mu_; + std::unique_ptr serializer_ GUARDED_BY(mu_); const std::vector> columns_; cbt::RowReader reader_ GUARDED_BY(mu_); cbt::v1::internal::RowReaderIterator it_ GUARDED_BY(mu_); @@ -275,7 +277,6 @@ class Iterator : public DatasetIterator { const absl::flat_hash_map, size_t> column_to_idx_; - const io::Serializer serializer_; }; class Dataset : public DatasetBase { diff --git a/tensorflow_io/core/kernels/bigtable/serialization.cc b/tensorflow_io/core/kernels/bigtable/serialization.cc index b37281735..fb2be6216 100644 --- a/tensorflow_io/core/kernels/bigtable/serialization.cc +++ b/tensorflow_io/core/kernels/bigtable/serialization.cc @@ -29,29 +29,11 @@ namespace { #include -inline StatusOr BytesToFloat(const cbt::Cell& cell) { - return errors::Unimplemented("Use BytesToFloatWin instead."); -} - -inline StatusOr BytesToDouble(const cbt::Cell& cell) { - return errors::Unimplemented("Use BytesToDoubleWin instead."); -} - -inline StatusOr BytesToInt64(const cbt::Cell& cell) { - return errors::Unimplemented("Use BytesToInt64Win instead."); -} - -inline StatusOr BytesToInt32(const cbt::Cell& cell) { - return errors::Unimplemented("Use BytesToInt32Win instead."); -} - -inline StatusOr BytesToBool(const cbt::Cell& cell) { - return errors::Unimplemented("Use BytesToBoolWin instead."); -} - #else // _WIN32 #include +#include "rpc/xdr.h" + inline StatusOr BytesToFloat(const cbt::Cell& cell) { std::string const& s = cell.value(); float v; @@ -170,7 +152,91 @@ inline StatusOr BytesToDoubleWin(const cbt::Cell& cell) { } } // namespace -Status Serializer::PutCellValueInTensor( + +#ifdef _WIN32 + +std::unique_ptr GetSerializer() { + VLOG(1) << "using custom implementation for serialization"; + return absl::make_unique(); +} + +Status XDRSerializer::PutCellValueInTensor( + Tensor& tensor, size_t index, DataType cell_type, + google::cloud::bigtable::Cell const& cell) const { + return errors::Unimplemented("XDR serializer is not available on windows."); +} + +#else // _WIN32 + +std::unique_ptr GetSerializer() { + const char* var = std::getenv("TFIO_DONT_USE_XDR"); + VLOG(1) << "got env TFIO_DONT_USE_XDR=" << var; + if (var && var[0] == '1') { + VLOG(1) << "using custom implementation for serialization"; + return absl::make_unique(); + } else { + VLOG(1) << "using XDR for serialization"; + return absl::make_unique(); + } +} + +Status XDRSerializer::PutCellValueInTensor( + Tensor& tensor, size_t index, DataType cell_type, + google::cloud::bigtable::Cell const& cell) const { + switch (cell_type) { + case DT_STRING: { + auto tensor_data = tensor.tensor(); + tensor_data(index) = std::string(cell.value()); + } break; + case DT_BOOL: { + auto tensor_data = tensor.tensor(); + auto maybe_parsed_data = BytesToBool(cell); + if (!maybe_parsed_data.ok()) { + return maybe_parsed_data.status(); + } + tensor_data(index) = maybe_parsed_data.ValueOrDie(); + } break; + case DT_INT32: { + auto tensor_data = tensor.tensor(); + auto maybe_parsed_data = BytesToInt32(cell); + if (!maybe_parsed_data.ok()) { + return maybe_parsed_data.status(); + } + tensor_data(index) = maybe_parsed_data.ValueOrDie(); + } break; + case DT_INT64: { + auto tensor_data = tensor.tensor(); + auto maybe_parsed_data = BytesToInt64(cell); + if (!maybe_parsed_data.ok()) { + return maybe_parsed_data.status(); + } + tensor_data(index) = maybe_parsed_data.ValueOrDie(); + } break; + case DT_FLOAT: { + auto tensor_data = tensor.tensor(); + auto maybe_parsed_data = BytesToFloat(cell); + if (!maybe_parsed_data.ok()) { + return maybe_parsed_data.status(); + } + tensor_data(index) = maybe_parsed_data.ValueOrDie(); + } break; + case DT_DOUBLE: { + auto tensor_data = tensor.tensor(); + auto maybe_parsed_data = BytesToDouble(cell); + if (!maybe_parsed_data.ok()) { + return maybe_parsed_data.status(); + } + tensor_data(index) = maybe_parsed_data.ValueOrDie(); + } break; + default: + return errors::Unimplemented("Data type not supported."); + } + return Status::OK(); +} + +#endif // _WIN32 + +Status CustomSerializer::PutCellValueInTensor( Tensor& tensor, size_t index, DataType cell_type, google::cloud::bigtable::Cell const& cell) const { switch (cell_type) { @@ -180,8 +246,7 @@ Status Serializer::PutCellValueInTensor( } break; case DT_BOOL: { auto tensor_data = tensor.tensor(); - auto maybe_parsed_data = - use_xdr_ ? BytesToBool(cell) : BytesToBoolWin(cell); + auto maybe_parsed_data = BytesToBoolWin(cell); if (!maybe_parsed_data.ok()) { return maybe_parsed_data.status(); } @@ -189,8 +254,7 @@ Status Serializer::PutCellValueInTensor( } break; case DT_INT32: { auto tensor_data = tensor.tensor(); - auto maybe_parsed_data = - use_xdr_ ? BytesToInt32(cell) : BytesToInt32Win(cell); + auto maybe_parsed_data = BytesToInt32Win(cell); if (!maybe_parsed_data.ok()) { return maybe_parsed_data.status(); } @@ -198,8 +262,7 @@ Status Serializer::PutCellValueInTensor( } break; case DT_INT64: { auto tensor_data = tensor.tensor(); - auto maybe_parsed_data = - use_xdr_ ? BytesToInt64(cell) : BytesToInt64Win(cell); + auto maybe_parsed_data = BytesToInt64Win(cell); if (!maybe_parsed_data.ok()) { return maybe_parsed_data.status(); } @@ -207,8 +270,7 @@ Status Serializer::PutCellValueInTensor( } break; case DT_FLOAT: { auto tensor_data = tensor.tensor(); - auto maybe_parsed_data = - use_xdr_ ? BytesToFloat(cell) : BytesToFloatWin(cell); + auto maybe_parsed_data = BytesToFloatWin(cell); if (!maybe_parsed_data.ok()) { return maybe_parsed_data.status(); } @@ -216,8 +278,7 @@ Status Serializer::PutCellValueInTensor( } break; case DT_DOUBLE: { auto tensor_data = tensor.tensor(); - auto maybe_parsed_data = - use_xdr_ ? BytesToDouble(cell) : BytesToDoubleWin(cell); + auto maybe_parsed_data = BytesToDoubleWin(cell); if (!maybe_parsed_data.ok()) { return maybe_parsed_data.status(); } diff --git a/tensorflow_io/core/kernels/bigtable/serialization.h b/tensorflow_io/core/kernels/bigtable/serialization.h index 7824c016e..02c2a9c29 100644 --- a/tensorflow_io/core/kernels/bigtable/serialization.h +++ b/tensorflow_io/core/kernels/bigtable/serialization.h @@ -17,7 +17,6 @@ limitations under the License. #define SERIALIZATION_H #include "google/cloud/bigtable/table.h" -#include "rpc/xdr.h" #include "tensorflow/core/framework/tensor.h" #include "tensorflow/core/platform/statusor.h" @@ -26,30 +25,29 @@ namespace io { class Serializer { public: - Serializer() { - const char* var = std::getenv("TFIO_DONT_USE_XDR"); - VLOG(1) << "got env TFIO_DONT_USE_XDR=" << var; - if (var && var[0] == '1') { - VLOG(1) << "using custom implementation for serialization"; - use_xdr_ = false; - } else { - VLOG(1) << "using XDR for serialization"; - use_xdr_ = true; - } - } - // Bigtable only stores values as byte buffers - except for int64 the server // side does not have any notion of types. Tensorflow, needs to store shorter // integers, floats, doubles, so we needed to decide on how. We chose to // follow what HBase does, since there is a path for migrating from HBase to // Bigtable. XDR seems to match what HBase does. + virtual Status PutCellValueInTensor( + Tensor& tensor, size_t index, DataType cell_type, + google::cloud::bigtable::Cell const& cell) const = 0; + ; +}; + +class CustomSerializer : public Serializer { Status PutCellValueInTensor(Tensor& tensor, size_t index, DataType cell_type, google::cloud::bigtable::Cell const& cell) const; +}; - private: - bool use_xdr_; +class XDRSerializer : public Serializer { + Status PutCellValueInTensor(Tensor& tensor, size_t index, DataType cell_type, + google::cloud::bigtable::Cell const& cell) const; }; +std::unique_ptr GetSerializer(); + } // namespace io } // namespace tensorflow From 26203572a95771da00713628d9992c27c3c0565a Mon Sep 17 00:00:00 2001 From: Kajetan Boroszko Date: Mon, 29 Nov 2021 16:50:34 +0100 Subject: [PATCH 13/30] refactored name of ReinterpretSerializer --- tensorflow_io/core/kernels/bigtable/serialization.cc | 6 +++--- tensorflow_io/core/kernels/bigtable/serialization.h | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/tensorflow_io/core/kernels/bigtable/serialization.cc b/tensorflow_io/core/kernels/bigtable/serialization.cc index fb2be6216..890f44f47 100644 --- a/tensorflow_io/core/kernels/bigtable/serialization.cc +++ b/tensorflow_io/core/kernels/bigtable/serialization.cc @@ -157,7 +157,7 @@ inline StatusOr BytesToDoubleWin(const cbt::Cell& cell) { std::unique_ptr GetSerializer() { VLOG(1) << "using custom implementation for serialization"; - return absl::make_unique(); + return absl::make_unique(); } Status XDRSerializer::PutCellValueInTensor( @@ -173,7 +173,7 @@ std::unique_ptr GetSerializer() { VLOG(1) << "got env TFIO_DONT_USE_XDR=" << var; if (var && var[0] == '1') { VLOG(1) << "using custom implementation for serialization"; - return absl::make_unique(); + return absl::make_unique(); } else { VLOG(1) << "using XDR for serialization"; return absl::make_unique(); @@ -236,7 +236,7 @@ Status XDRSerializer::PutCellValueInTensor( #endif // _WIN32 -Status CustomSerializer::PutCellValueInTensor( +Status ReinterpretSerializer::PutCellValueInTensor( Tensor& tensor, size_t index, DataType cell_type, google::cloud::bigtable::Cell const& cell) const { switch (cell_type) { diff --git a/tensorflow_io/core/kernels/bigtable/serialization.h b/tensorflow_io/core/kernels/bigtable/serialization.h index 02c2a9c29..ab377c845 100644 --- a/tensorflow_io/core/kernels/bigtable/serialization.h +++ b/tensorflow_io/core/kernels/bigtable/serialization.h @@ -36,7 +36,7 @@ class Serializer { ; }; -class CustomSerializer : public Serializer { +class ReinterpretSerializer : public Serializer { Status PutCellValueInTensor(Tensor& tensor, size_t index, DataType cell_type, google::cloud::bigtable::Cell const& cell) const; }; From 438a9a02ed08de565f759cef1c6f922bb62a9ec0 Mon Sep 17 00:00:00 2001 From: Kajetan Boroszko Date: Mon, 29 Nov 2021 17:29:45 +0100 Subject: [PATCH 14/30] wrong type --- tensorflow_io/core/kernels/bigtable/serialization.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tensorflow_io/core/kernels/bigtable/serialization.cc b/tensorflow_io/core/kernels/bigtable/serialization.cc index 890f44f47..dba6f1845 100644 --- a/tensorflow_io/core/kernels/bigtable/serialization.cc +++ b/tensorflow_io/core/kernels/bigtable/serialization.cc @@ -118,7 +118,7 @@ inline StatusOr BytesToBoolWin(const cbt::Cell& cell) { return int_rep; } union { - bool res; + bool_t res; int32_t int_rep; } u; u.int_rep = *int_rep; From 92e8709d3fe6bb482b787b6d195baba35f3a7cfb Mon Sep 17 00:00:00 2001 From: Kajetan Boroszko Date: Mon, 29 Nov 2021 18:21:42 +0100 Subject: [PATCH 15/30] removed switching serializers --- .../bigtable/bigtable_dataset_kernel.cc | 4 +- .../core/kernels/bigtable/serialization.cc | 208 ++++++------------ .../core/kernels/bigtable/serialization.h | 19 +- 3 files changed, 66 insertions(+), 165 deletions(-) diff --git a/tensorflow_io/core/kernels/bigtable/bigtable_dataset_kernel.cc b/tensorflow_io/core/kernels/bigtable/bigtable_dataset_kernel.cc index 8cef560f3..4a9a512de 100644 --- a/tensorflow_io/core/kernels/bigtable/bigtable_dataset_kernel.cc +++ b/tensorflow_io/core/kernels/bigtable/bigtable_dataset_kernel.cc @@ -146,7 +146,6 @@ class Iterator : public DatasetIterator { explicit Iterator(const typename DatasetIterator::Params& params, const std::vector& columns) : DatasetIterator(params), - serializer_(io::GetSerializer()), columns_(ColumnsToFamiliesAndQualifiers(columns)), reader_(this->dataset()->CreateTable().ReadRows( this->dataset()->row_set(), @@ -186,7 +185,7 @@ class Iterator : public DatasetIterator { const auto column_idx = column_to_idx_.find(key); if (column_idx != column_to_idx_.end()) { VLOG(1) << "getting column:" << column_idx->second; - TF_RETURN_IF_ERROR(serializer_->PutCellValueInTensor( + TF_RETURN_IF_ERROR(io::PutCellValueInTensor( res, column_idx->second, dtype, cell)); } else { LOG(ERROR) << "column " << cell.family_name() << ":" @@ -268,7 +267,6 @@ class Iterator : public DatasetIterator { } mutex mu_; - std::unique_ptr serializer_ GUARDED_BY(mu_); const std::vector> columns_; cbt::RowReader reader_ GUARDED_BY(mu_); cbt::v1::internal::RowReaderIterator it_ GUARDED_BY(mu_); diff --git a/tensorflow_io/core/kernels/bigtable/serialization.cc b/tensorflow_io/core/kernels/bigtable/serialization.cc index dba6f1845..114614417 100644 --- a/tensorflow_io/core/kernels/bigtable/serialization.cc +++ b/tensorflow_io/core/kernels/bigtable/serialization.cc @@ -29,69 +29,8 @@ namespace { #include -#else // _WIN32 -#include - -#include "rpc/xdr.h" - -inline StatusOr BytesToFloat(const cbt::Cell& cell) { - std::string const& s = cell.value(); - float v; - XDR xdrs; - xdrmem_create(&xdrs, const_cast(s.data()), sizeof(v), XDR_DECODE); - if (!xdr_float(&xdrs, &v)) { - return errors::InvalidArgument("Error reading float from byte array."); - } - return v; -} - -inline StatusOr BytesToDouble(const cbt::Cell& cell) { - std::string const& s = cell.value(); - double v; - XDR xdrs; - xdrmem_create(&xdrs, const_cast(s.data()), sizeof(v), XDR_DECODE); - if (!xdr_double(&xdrs, &v)) { - return errors::InvalidArgument("Error reading double from byte array."); - } - return v; -} - -inline StatusOr BytesToInt64(const cbt::Cell& cell) { - std::string const& s = cell.value(); - int64_t v; - XDR xdrs; - xdrmem_create(&xdrs, const_cast(s.data()), sizeof(v), XDR_DECODE); - if (!xdr_int64_t(&xdrs, &v)) { - return errors::InvalidArgument("Error reading int64 from byte array."); - } - return v; -} inline StatusOr BytesToInt32(const cbt::Cell& cell) { - std::string const& s = cell.value(); - int32_t v; - XDR xdrs; - xdrmem_create(&xdrs, const_cast(s.data()), sizeof(v), XDR_DECODE); - if (!xdr_int32_t(&xdrs, &v)) { - return errors::InvalidArgument("Error reading int32 from byte array."); - } - return v; -} - -inline StatusOr BytesToBool(const cbt::Cell& cell) { - std::string const& s = cell.value(); - bool_t v; - XDR xdrs; - xdrmem_create(&xdrs, const_cast(s.data()), sizeof(v), XDR_DECODE); - if (!xdr_bool(&xdrs, &v)) { - return errors::InvalidArgument("Error reading bool from byte array."); - } - return v; -} - -#endif // _WIN32 - -inline StatusOr BytesToInt32Win(const cbt::Cell& cell) { std::string const& bytes = cell.value(); union { char bytes[4]; @@ -104,7 +43,7 @@ inline StatusOr BytesToInt32Win(const cbt::Cell& cell) { return ntohl(u.res); } -inline StatusOr BytesToInt64Win(const cbt::Cell& cell) { +inline StatusOr BytesToInt64(const cbt::Cell& cell) { auto maybe_value = cell.decode_big_endian_integer(); if (!maybe_value.ok()) { return errors::InvalidArgument("Invalid int32 representation."); @@ -112,8 +51,8 @@ inline StatusOr BytesToInt64Win(const cbt::Cell& cell) { return maybe_value.value(); } -inline StatusOr BytesToBoolWin(const cbt::Cell& cell) { - auto const int_rep = BytesToInt32Win(cell); +inline StatusOr BytesToBool(const cbt::Cell& cell) { + auto const int_rep = BytesToInt32(cell); if (!int_rep.ok()) { return int_rep; } @@ -125,8 +64,8 @@ inline StatusOr BytesToBoolWin(const cbt::Cell& cell) { return u.res; } -inline StatusOr BytesToFloatWin(const cbt::Cell& cell) { - auto const int_rep = BytesToInt32Win(cell); +inline StatusOr BytesToFloat(const cbt::Cell& cell) { + auto const int_rep = BytesToInt32(cell); if (!int_rep.ok()) { return int_rep; } @@ -138,8 +77,8 @@ inline StatusOr BytesToFloatWin(const cbt::Cell& cell) { return u.res; } -inline StatusOr BytesToDoubleWin(const cbt::Cell& cell) { - auto const int_rep = BytesToInt64Win(cell); +inline StatusOr BytesToDouble(const cbt::Cell& cell) { + auto const int_rep = BytesToInt64(cell); if (!int_rep.ok()) { return int_rep; } @@ -151,38 +90,74 @@ inline StatusOr BytesToDoubleWin(const cbt::Cell& cell) { return u.res; } -} // namespace -#ifdef _WIN32 +#else // _WIN32 + +#include "rpc/xdr.h" -std::unique_ptr GetSerializer() { - VLOG(1) << "using custom implementation for serialization"; - return absl::make_unique(); +inline StatusOr BytesToFloat(const cbt::Cell& cell) { + std::string const& s = cell.value(); + float v; + XDR xdrs; + xdrmem_create(&xdrs, const_cast(s.data()), sizeof(v), XDR_DECODE); + if (!xdr_float(&xdrs, &v)) { + return errors::InvalidArgument("Error reading float from byte array."); + } + return v; } -Status XDRSerializer::PutCellValueInTensor( - Tensor& tensor, size_t index, DataType cell_type, - google::cloud::bigtable::Cell const& cell) const { - return errors::Unimplemented("XDR serializer is not available on windows."); +inline StatusOr BytesToDouble(const cbt::Cell& cell) { + std::string const& s = cell.value(); + double v; + XDR xdrs; + xdrmem_create(&xdrs, const_cast(s.data()), sizeof(v), XDR_DECODE); + if (!xdr_double(&xdrs, &v)) { + return errors::InvalidArgument("Error reading double from byte array."); + } + return v; } -#else // _WIN32 +inline StatusOr BytesToInt64(const cbt::Cell& cell) { + std::string const& s = cell.value(); + int64_t v; + XDR xdrs; + xdrmem_create(&xdrs, const_cast(s.data()), sizeof(v), XDR_DECODE); + if (!xdr_int64_t(&xdrs, &v)) { + return errors::InvalidArgument("Error reading int64 from byte array."); + } + return v; +} -std::unique_ptr GetSerializer() { - const char* var = std::getenv("TFIO_DONT_USE_XDR"); - VLOG(1) << "got env TFIO_DONT_USE_XDR=" << var; - if (var && var[0] == '1') { - VLOG(1) << "using custom implementation for serialization"; - return absl::make_unique(); - } else { - VLOG(1) << "using XDR for serialization"; - return absl::make_unique(); +inline StatusOr BytesToInt32(const cbt::Cell& cell) { + std::string const& s = cell.value(); + int32_t v; + XDR xdrs; + xdrmem_create(&xdrs, const_cast(s.data()), sizeof(v), XDR_DECODE); + if (!xdr_int32_t(&xdrs, &v)) { + return errors::InvalidArgument("Error reading int32 from byte array."); } + return v; } -Status XDRSerializer::PutCellValueInTensor( +inline StatusOr BytesToBool(const cbt::Cell& cell) { + std::string const& s = cell.value(); + bool_t v; + XDR xdrs; + xdrmem_create(&xdrs, const_cast(s.data()), sizeof(v), XDR_DECODE); + if (!xdr_bool(&xdrs, &v)) { + return errors::InvalidArgument("Error reading bool from byte array."); + } + return v; +} + +#endif // _WIN32 + +} // namespace + + +Status PutCellValueInTensor( Tensor& tensor, size_t index, DataType cell_type, - google::cloud::bigtable::Cell const& cell) const { + google::cloud::bigtable::Cell const& cell) { switch (cell_type) { case DT_STRING: { auto tensor_data = tensor.tensor(); @@ -234,61 +209,6 @@ Status XDRSerializer::PutCellValueInTensor( return Status::OK(); } -#endif // _WIN32 - -Status ReinterpretSerializer::PutCellValueInTensor( - Tensor& tensor, size_t index, DataType cell_type, - google::cloud::bigtable::Cell const& cell) const { - switch (cell_type) { - case DT_STRING: { - auto tensor_data = tensor.tensor(); - tensor_data(index) = std::string(cell.value()); - } break; - case DT_BOOL: { - auto tensor_data = tensor.tensor(); - auto maybe_parsed_data = BytesToBoolWin(cell); - if (!maybe_parsed_data.ok()) { - return maybe_parsed_data.status(); - } - tensor_data(index) = maybe_parsed_data.ValueOrDie(); - } break; - case DT_INT32: { - auto tensor_data = tensor.tensor(); - auto maybe_parsed_data = BytesToInt32Win(cell); - if (!maybe_parsed_data.ok()) { - return maybe_parsed_data.status(); - } - tensor_data(index) = maybe_parsed_data.ValueOrDie(); - } break; - case DT_INT64: { - auto tensor_data = tensor.tensor(); - auto maybe_parsed_data = BytesToInt64Win(cell); - if (!maybe_parsed_data.ok()) { - return maybe_parsed_data.status(); - } - tensor_data(index) = maybe_parsed_data.ValueOrDie(); - } break; - case DT_FLOAT: { - auto tensor_data = tensor.tensor(); - auto maybe_parsed_data = BytesToFloatWin(cell); - if (!maybe_parsed_data.ok()) { - return maybe_parsed_data.status(); - } - tensor_data(index) = maybe_parsed_data.ValueOrDie(); - } break; - case DT_DOUBLE: { - auto tensor_data = tensor.tensor(); - auto maybe_parsed_data = BytesToDoubleWin(cell); - if (!maybe_parsed_data.ok()) { - return maybe_parsed_data.status(); - } - tensor_data(index) = maybe_parsed_data.ValueOrDie(); - } break; - default: - return errors::Unimplemented("Data type not supported."); - } - return Status::OK(); -} } // namespace io } // namespace tensorflow diff --git a/tensorflow_io/core/kernels/bigtable/serialization.h b/tensorflow_io/core/kernels/bigtable/serialization.h index ab377c845..f57a48c4e 100644 --- a/tensorflow_io/core/kernels/bigtable/serialization.h +++ b/tensorflow_io/core/kernels/bigtable/serialization.h @@ -23,30 +23,13 @@ limitations under the License. namespace tensorflow { namespace io { -class Serializer { - public: // Bigtable only stores values as byte buffers - except for int64 the server // side does not have any notion of types. Tensorflow, needs to store shorter // integers, floats, doubles, so we needed to decide on how. We chose to // follow what HBase does, since there is a path for migrating from HBase to // Bigtable. XDR seems to match what HBase does. - virtual Status PutCellValueInTensor( - Tensor& tensor, size_t index, DataType cell_type, - google::cloud::bigtable::Cell const& cell) const = 0; - ; -}; - -class ReinterpretSerializer : public Serializer { - Status PutCellValueInTensor(Tensor& tensor, size_t index, DataType cell_type, - google::cloud::bigtable::Cell const& cell) const; -}; - -class XDRSerializer : public Serializer { Status PutCellValueInTensor(Tensor& tensor, size_t index, DataType cell_type, - google::cloud::bigtable::Cell const& cell) const; -}; - -std::unique_ptr GetSerializer(); + google::cloud::bigtable::Cell const& cell); } // namespace io } // namespace tensorflow From c2b83cc434d17bb2ddb4db7e1db04d1621855020 Mon Sep 17 00:00:00 2001 From: Kajetan Boroszko Date: Mon, 29 Nov 2021 18:25:04 +0100 Subject: [PATCH 16/30] upadated tests --- tests/test_bigtable/test_serialization.py | 68 +---------------------- 1 file changed, 1 insertion(+), 67 deletions(-) diff --git a/tests/test_bigtable/test_serialization.py b/tests/test_bigtable/test_serialization.py index d4245e6a8..9d57a7dfe 100644 --- a/tests/test_bigtable/test_serialization.py +++ b/tests/test_bigtable/test_serialization.py @@ -28,8 +28,6 @@ from google.cloud.bigtable import Client import datetime -USE_XDR_ENV_VAR_NAME = "TFIO_DONT_USE_XDR" - def check_values(test_case, values, table, type_name, tf_dtype): for i, r in enumerate( @@ -134,19 +132,6 @@ def tearDown(self): self.emulator.stop() def test_float_xdr(self): - if USE_XDR_ENV_VAR_NAME in os.environ: - del os.environ[USE_XDR_ENV_VAR_NAME] - - values = tf.constant(self.data["values"], dtype=tf.float32) - - client = BigtableClient("fake_project", "fake_instance") - table = client.get_table("test-table") - - check_values(self, values, table, "float", tf.float32) - - def test_float_win(self): - os.environ[USE_XDR_ENV_VAR_NAME] = "1" - values = tf.constant(self.data["values"], dtype=tf.float32) client = BigtableClient("fake_project", "fake_instance") @@ -155,18 +140,6 @@ def test_float_win(self): check_values(self, values, table, "float", tf.float32) def test_double_xdr(self): - if USE_XDR_ENV_VAR_NAME in os.environ: - del os.environ[USE_XDR_ENV_VAR_NAME] - - values = tf.constant(self.data["values"], dtype=tf.float64) - - client = BigtableClient("fake_project", "fake_instance") - table = client.get_table("test-table") - - check_values(self, values, table, "double", tf.float64) - - def test_double_win(self): - os.environ[USE_XDR_ENV_VAR_NAME] = "1" values = tf.constant(self.data["values"], dtype=tf.float64) client = BigtableClient("fake_project", "fake_instance") @@ -175,19 +148,6 @@ def test_double_win(self): check_values(self, values, table, "double", tf.float64) def test_int64_xdr(self): - if USE_XDR_ENV_VAR_NAME in os.environ: - del os.environ[USE_XDR_ENV_VAR_NAME] - - values = tf.cast(tf.constant(self.data["values"]), dtype=tf.int64) - - client = BigtableClient("fake_project", "fake_instance") - table = client.get_table("test-table") - - check_values(self, values, table, "int64", tf.int64) - - def test_int64_win(self): - os.environ[USE_XDR_ENV_VAR_NAME] = "1" - values = tf.cast(tf.constant(self.data["values"]), dtype=tf.int64) client = BigtableClient("fake_project", "fake_instance") @@ -196,19 +156,6 @@ def test_int64_win(self): check_values(self, values, table, "int64", tf.int64) def test_int32_xdr(self): - if USE_XDR_ENV_VAR_NAME in os.environ: - del os.environ[USE_XDR_ENV_VAR_NAME] - - values = tf.cast(tf.constant(self.data["values"]), dtype=tf.int32) - - client = BigtableClient("fake_project", "fake_instance") - table = client.get_table("test-table") - - check_values(self, values, table, "int32", tf.int32) - - def test_int32_win(self): - os.environ[USE_XDR_ENV_VAR_NAME] = "1" - values = tf.cast(tf.constant(self.data["values"]), dtype=tf.int32) client = BigtableClient("fake_project", "fake_instance") @@ -217,22 +164,9 @@ def test_int32_win(self): check_values(self, values, table, "int32", tf.int32) def test_bool_xdr(self): - if USE_XDR_ENV_VAR_NAME in os.environ: - del os.environ[USE_XDR_ENV_VAR_NAME] - - values = tf.cast(tf.constant(self.data["values"]), dtype=tf.bool) - - client = BigtableClient("fake_project", "fake_instance") - table = client.get_table("test-table") - - check_values(self, values, table, "bool", tf.bool) - - def test_bool_win(self): - os.environ[USE_XDR_ENV_VAR_NAME] = "1" - values = tf.cast(tf.constant(self.data["values"]), dtype=tf.bool) client = BigtableClient("fake_project", "fake_instance") table = client.get_table("test-table") - check_values(self, values, table, "bool", tf.bool) + check_values(self, values, table, "bool", tf.bool) \ No newline at end of file From 2bb9f9242d9d3962e3700a6de997771e4e6c7355 Mon Sep 17 00:00:00 2001 From: Kajetan Boroszko Date: Mon, 29 Nov 2021 18:40:01 +0100 Subject: [PATCH 17/30] PR comments --- WORKSPACE | 11 ++++++++--- bld.sh | 9 --------- tensorflow_io/core/kernels/bigtable/serialization.h | 10 +++++++++- 3 files changed, 17 insertions(+), 13 deletions(-) delete mode 100755 bld.sh diff --git a/WORKSPACE b/WORKSPACE index 4516171d4..f4c368bb0 100644 --- a/WORKSPACE +++ b/WORKSPACE @@ -111,13 +111,18 @@ http_archive( # Note com_google_googleapis is placed earlier as we need to adjust switched_rules_by_language option # Note we have to change one word in the field_behavior.proto so it compiles on WINDOWS # for more infor please refer to https://github.com/protocolbuffers/protobuf/issues/7076 +# Because of a bug in protocol buffers (protocolbuffers/protobuf#7076), new versions of this project +# fail to compile on Windows. The problem hinges on OPTIONAL being defined as an empty string under +# Windows. This makes the preprocessor remove every mention of OPTIONAL from the code, which causes +# compilation failures. This temporary workaround renames the name of the protobuf value OPTIONAL to +# OPIONAL. This should be safe as it does not affect the generated protobufs. http_archive( name = "com_google_googleapis", build_file = "@com_github_googleapis_google_cloud_cpp//bazel:googleapis.BUILD", patch_cmds = [ - """sed -i.bak 's/OPTIONAL/OPIONAL/g' google/api/field_behavior.proto""", - """sed -i.bak 's/OPTIONAL/OPIONAL/g' google/pubsub/v1beta2/pubsub.proto""", - """sed -i.bak 's/OPTIONAL/OPIONAL/g' google/pubsub/v1/pubsub.proto""", + """sed -i 's/OPTIONAL/OPIONAL/g' google/api/field_behavior.proto""", + """sed -i 's/OPTIONAL/OPIONAL/g' google/pubsub/v1beta2/pubsub.proto""", + """sed -i 's/OPTIONAL/OPIONAL/g' google/pubsub/v1/pubsub.proto""", ], sha256 = "a53e15405f81d5a32594d7f6486e649131fadda5431cf28377dff4ae54d45d16", strip_prefix = "googleapis-d4d09eb3aec152015f35717102f9b423988b94f7", diff --git a/bld.sh b/bld.sh deleted file mode 100755 index e3726be67..000000000 --- a/bld.sh +++ /dev/null @@ -1,9 +0,0 @@ -#!/usr/bin/env bash - -ls dist/* -for f in dist/*.whl; do - docker run -i --rm -v $PWD:/v -w /v --net=host quay.io/pypa/manylinux2010_x86_64 bash -x -e /v/tools/build/auditwheel repair --plat manylinux2010_x86_64 $f -done -sudo chown -R $(id -nu):$(id -ng) . -ls wheelhouse/* - diff --git a/tensorflow_io/core/kernels/bigtable/serialization.h b/tensorflow_io/core/kernels/bigtable/serialization.h index f57a48c4e..494782e63 100644 --- a/tensorflow_io/core/kernels/bigtable/serialization.h +++ b/tensorflow_io/core/kernels/bigtable/serialization.h @@ -27,7 +27,15 @@ namespace io { // side does not have any notion of types. Tensorflow, needs to store shorter // integers, floats, doubles, so we needed to decide on how. We chose to // follow what HBase does, since there is a path for migrating from HBase to - // Bigtable. XDR seems to match what HBase does. + // Bigtable. HBase stores integers as big-endian and floats as IEEE754 + // (also big-endian). Given that integer endianness does not always match + // float endianness, and the fact that there are architectures where it is + // neither little nor big (BE-32), implementing this properly is non-trivial. + // Ideally, we would use a library to do that. XDR matches what HBase does, + // but it is not easily available on Windows, so we decided to go with a + // hybrid approach. On Windows we assume that integer endianness matches float + // endianness and implement the deserialization ourselves and everywhere else + // we use XDR. For that reason we provide two implementations Status PutCellValueInTensor(Tensor& tensor, size_t index, DataType cell_type, google::cloud::bigtable::Cell const& cell); From 9b1489d7eb3c02744443185ae59f3a9793b4c0a3 Mon Sep 17 00:00:00 2001 From: Kajetan Boroszko Date: Mon, 29 Nov 2021 18:42:29 +0100 Subject: [PATCH 18/30] linting --- WORKSPACE | 8 ++--- .../bigtable/bigtable_dataset_kernel.cc | 4 +-- .../core/kernels/bigtable/serialization.cc | 9 ++---- .../core/kernels/bigtable/serialization.h | 30 +++++++++---------- tests/test_bigtable/test_serialization.py | 2 +- 5 files changed, 24 insertions(+), 29 deletions(-) diff --git a/WORKSPACE b/WORKSPACE index f4c368bb0..3ec31541f 100644 --- a/WORKSPACE +++ b/WORKSPACE @@ -111,10 +111,10 @@ http_archive( # Note com_google_googleapis is placed earlier as we need to adjust switched_rules_by_language option # Note we have to change one word in the field_behavior.proto so it compiles on WINDOWS # for more infor please refer to https://github.com/protocolbuffers/protobuf/issues/7076 -# Because of a bug in protocol buffers (protocolbuffers/protobuf#7076), new versions of this project -# fail to compile on Windows. The problem hinges on OPTIONAL being defined as an empty string under -# Windows. This makes the preprocessor remove every mention of OPTIONAL from the code, which causes -# compilation failures. This temporary workaround renames the name of the protobuf value OPTIONAL to +# Because of a bug in protocol buffers (protocolbuffers/protobuf#7076), new versions of this project +# fail to compile on Windows. The problem hinges on OPTIONAL being defined as an empty string under +# Windows. This makes the preprocessor remove every mention of OPTIONAL from the code, which causes +# compilation failures. This temporary workaround renames the name of the protobuf value OPTIONAL to # OPIONAL. This should be safe as it does not affect the generated protobufs. http_archive( name = "com_google_googleapis", diff --git a/tensorflow_io/core/kernels/bigtable/bigtable_dataset_kernel.cc b/tensorflow_io/core/kernels/bigtable/bigtable_dataset_kernel.cc index 4a9a512de..596da7396 100644 --- a/tensorflow_io/core/kernels/bigtable/bigtable_dataset_kernel.cc +++ b/tensorflow_io/core/kernels/bigtable/bigtable_dataset_kernel.cc @@ -185,8 +185,8 @@ class Iterator : public DatasetIterator { const auto column_idx = column_to_idx_.find(key); if (column_idx != column_to_idx_.end()) { VLOG(1) << "getting column:" << column_idx->second; - TF_RETURN_IF_ERROR(io::PutCellValueInTensor( - res, column_idx->second, dtype, cell)); + TF_RETURN_IF_ERROR( + io::PutCellValueInTensor(res, column_idx->second, dtype, cell)); } else { LOG(ERROR) << "column " << cell.family_name() << ":" << cell.column_qualifier() diff --git a/tensorflow_io/core/kernels/bigtable/serialization.cc b/tensorflow_io/core/kernels/bigtable/serialization.cc index 114614417..e299f978e 100644 --- a/tensorflow_io/core/kernels/bigtable/serialization.cc +++ b/tensorflow_io/core/kernels/bigtable/serialization.cc @@ -29,7 +29,6 @@ namespace { #include - inline StatusOr BytesToInt32(const cbt::Cell& cell) { std::string const& bytes = cell.value(); union { @@ -90,7 +89,6 @@ inline StatusOr BytesToDouble(const cbt::Cell& cell) { return u.res; } - #else // _WIN32 #include "rpc/xdr.h" @@ -154,10 +152,8 @@ inline StatusOr BytesToBool(const cbt::Cell& cell) { } // namespace - -Status PutCellValueInTensor( - Tensor& tensor, size_t index, DataType cell_type, - google::cloud::bigtable::Cell const& cell) { +Status PutCellValueInTensor(Tensor& tensor, size_t index, DataType cell_type, + google::cloud::bigtable::Cell const& cell) { switch (cell_type) { case DT_STRING: { auto tensor_data = tensor.tensor(); @@ -209,6 +205,5 @@ Status PutCellValueInTensor( return Status::OK(); } - } // namespace io } // namespace tensorflow diff --git a/tensorflow_io/core/kernels/bigtable/serialization.h b/tensorflow_io/core/kernels/bigtable/serialization.h index 494782e63..9db612b07 100644 --- a/tensorflow_io/core/kernels/bigtable/serialization.h +++ b/tensorflow_io/core/kernels/bigtable/serialization.h @@ -23,21 +23,21 @@ limitations under the License. namespace tensorflow { namespace io { - // Bigtable only stores values as byte buffers - except for int64 the server - // side does not have any notion of types. Tensorflow, needs to store shorter - // integers, floats, doubles, so we needed to decide on how. We chose to - // follow what HBase does, since there is a path for migrating from HBase to - // Bigtable. HBase stores integers as big-endian and floats as IEEE754 - // (also big-endian). Given that integer endianness does not always match - // float endianness, and the fact that there are architectures where it is - // neither little nor big (BE-32), implementing this properly is non-trivial. - // Ideally, we would use a library to do that. XDR matches what HBase does, - // but it is not easily available on Windows, so we decided to go with a - // hybrid approach. On Windows we assume that integer endianness matches float - // endianness and implement the deserialization ourselves and everywhere else - // we use XDR. For that reason we provide two implementations - Status PutCellValueInTensor(Tensor& tensor, size_t index, DataType cell_type, - google::cloud::bigtable::Cell const& cell); +// Bigtable only stores values as byte buffers - except for int64 the server +// side does not have any notion of types. Tensorflow, needs to store shorter +// integers, floats, doubles, so we needed to decide on how. We chose to +// follow what HBase does, since there is a path for migrating from HBase to +// Bigtable. HBase stores integers as big-endian and floats as IEEE754 +// (also big-endian). Given that integer endianness does not always match +// float endianness, and the fact that there are architectures where it is +// neither little nor big (BE-32), implementing this properly is non-trivial. +// Ideally, we would use a library to do that. XDR matches what HBase does, +// but it is not easily available on Windows, so we decided to go with a +// hybrid approach. On Windows we assume that integer endianness matches float +// endianness and implement the deserialization ourselves and everywhere else +// we use XDR. For that reason we provide two implementations +Status PutCellValueInTensor(Tensor& tensor, size_t index, DataType cell_type, + google::cloud::bigtable::Cell const& cell); } // namespace io } // namespace tensorflow diff --git a/tests/test_bigtable/test_serialization.py b/tests/test_bigtable/test_serialization.py index 9d57a7dfe..ecb216507 100644 --- a/tests/test_bigtable/test_serialization.py +++ b/tests/test_bigtable/test_serialization.py @@ -169,4 +169,4 @@ def test_bool_xdr(self): client = BigtableClient("fake_project", "fake_instance") table = client.get_table("test-table") - check_values(self, values, table, "bool", tf.bool) \ No newline at end of file + check_values(self, values, table, "bool", tf.bool) From d15dbe40900150a94c81d146c35abc079cfd74b0 Mon Sep 17 00:00:00 2001 From: Kajetan Boroszko Date: Mon, 29 Nov 2021 18:48:55 +0100 Subject: [PATCH 19/30] return status in bytesToBool --- tensorflow_io/core/kernels/bigtable/serialization.cc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tensorflow_io/core/kernels/bigtable/serialization.cc b/tensorflow_io/core/kernels/bigtable/serialization.cc index e299f978e..a5523c2be 100644 --- a/tensorflow_io/core/kernels/bigtable/serialization.cc +++ b/tensorflow_io/core/kernels/bigtable/serialization.cc @@ -25,7 +25,7 @@ namespace tensorflow { namespace io { namespace { -#ifdef _WIN32 +#ifdef __linux__ #include @@ -53,7 +53,7 @@ inline StatusOr BytesToInt64(const cbt::Cell& cell) { inline StatusOr BytesToBool(const cbt::Cell& cell) { auto const int_rep = BytesToInt32(cell); if (!int_rep.ok()) { - return int_rep; + return int_rep.status(); } union { bool_t res; From 614bf886125b15162bb5c3e3d5a00c296349cd03 Mon Sep 17 00:00:00 2001 From: Kajetan Boroszko Date: Mon, 29 Nov 2021 18:50:21 +0100 Subject: [PATCH 20/30] _win32 flag --- tensorflow_io/core/kernels/bigtable/serialization.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tensorflow_io/core/kernels/bigtable/serialization.cc b/tensorflow_io/core/kernels/bigtable/serialization.cc index a5523c2be..cde5c092c 100644 --- a/tensorflow_io/core/kernels/bigtable/serialization.cc +++ b/tensorflow_io/core/kernels/bigtable/serialization.cc @@ -25,7 +25,7 @@ namespace tensorflow { namespace io { namespace { -#ifdef __linux__ +#ifdef _WIN32 #include From d8148a7c533f6ee38bd783b15d497881116b4dad Mon Sep 17 00:00:00 2001 From: Kajetan Boroszko Date: Tue, 30 Nov 2021 09:07:04 +0100 Subject: [PATCH 21/30] static cast --- tensorflow_io/core/kernels/bigtable/serialization.cc | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/tensorflow_io/core/kernels/bigtable/serialization.cc b/tensorflow_io/core/kernels/bigtable/serialization.cc index cde5c092c..e7471ff73 100644 --- a/tensorflow_io/core/kernels/bigtable/serialization.cc +++ b/tensorflow_io/core/kernels/bigtable/serialization.cc @@ -50,17 +50,12 @@ inline StatusOr BytesToInt64(const cbt::Cell& cell) { return maybe_value.value(); } -inline StatusOr BytesToBool(const cbt::Cell& cell) { +inline StatusOr BytesToBool(const cbt::Cell& cell) { auto const int_rep = BytesToInt32(cell); if (!int_rep.ok()) { return int_rep.status(); } - union { - bool_t res; - int32_t int_rep; - } u; - u.int_rep = *int_rep; - return u.res; + return std::static_cast(v.ValueOrDie()); } inline StatusOr BytesToFloat(const cbt::Cell& cell) { From d602a90eae484174288f6bcbc245253b092b8652 Mon Sep 17 00:00:00 2001 From: Kajetan Boroszko Date: Tue, 30 Nov 2021 09:58:55 +0100 Subject: [PATCH 22/30] added bak to sed in WROKSPACE --- WORKSPACE | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/WORKSPACE b/WORKSPACE index 3ec31541f..0312d977f 100644 --- a/WORKSPACE +++ b/WORKSPACE @@ -120,9 +120,9 @@ http_archive( name = "com_google_googleapis", build_file = "@com_github_googleapis_google_cloud_cpp//bazel:googleapis.BUILD", patch_cmds = [ - """sed -i 's/OPTIONAL/OPIONAL/g' google/api/field_behavior.proto""", - """sed -i 's/OPTIONAL/OPIONAL/g' google/pubsub/v1beta2/pubsub.proto""", - """sed -i 's/OPTIONAL/OPIONAL/g' google/pubsub/v1/pubsub.proto""", + """sed -i.bak 's/OPTIONAL/OPIONAL/g' google/api/field_behavior.proto""", + """sed -i.bak 's/OPTIONAL/OPIONAL/g' google/pubsub/v1beta2/pubsub.proto""", + """sed -i.bak 's/OPTIONAL/OPIONAL/g' google/pubsub/v1/pubsub.proto""", ], sha256 = "a53e15405f81d5a32594d7f6486e649131fadda5431cf28377dff4ae54d45d16", strip_prefix = "googleapis-d4d09eb3aec152015f35717102f9b423988b94f7", From c91d6e4bcd7f55147b2672d6a7cf9e3bd23b6b98 Mon Sep 17 00:00:00 2001 From: Kajetan Boroszko Date: Tue, 30 Nov 2021 10:22:43 +0100 Subject: [PATCH 23/30] static cast no std --- tensorflow_io/core/kernels/bigtable/serialization.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tensorflow_io/core/kernels/bigtable/serialization.cc b/tensorflow_io/core/kernels/bigtable/serialization.cc index 7e371a4d2..2372c62be 100644 --- a/tensorflow_io/core/kernels/bigtable/serialization.cc +++ b/tensorflow_io/core/kernels/bigtable/serialization.cc @@ -55,7 +55,7 @@ inline StatusOr BytesToBool(const cbt::Cell& cell) { if (!int_rep.ok()) { return int_rep.status(); } - return std::static_cast(v.ValueOrDie()); + return static_cast(v.ValueOrDie()); } inline StatusOr BytesToFloat(const cbt::Cell& cell) { From 1daac68cd7f18e2dc0c205655edcff4ba65f655a Mon Sep 17 00:00:00 2001 From: Kajetan Boroszko Date: Tue, 30 Nov 2021 10:24:44 +0100 Subject: [PATCH 24/30] different value --- tensorflow_io/core/kernels/bigtable/serialization.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tensorflow_io/core/kernels/bigtable/serialization.cc b/tensorflow_io/core/kernels/bigtable/serialization.cc index 2372c62be..c427e7750 100644 --- a/tensorflow_io/core/kernels/bigtable/serialization.cc +++ b/tensorflow_io/core/kernels/bigtable/serialization.cc @@ -55,7 +55,7 @@ inline StatusOr BytesToBool(const cbt::Cell& cell) { if (!int_rep.ok()) { return int_rep.status(); } - return static_cast(v.ValueOrDie()); + return static_cast(int_rep.ValueOrDie()); } inline StatusOr BytesToFloat(const cbt::Cell& cell) { From 05d08afb9f98151166f6492b3127f1ceb23d364c Mon Sep 17 00:00:00 2001 From: Kajetan Boroszko Date: Tue, 30 Nov 2021 10:29:27 +0100 Subject: [PATCH 25/30] removed status() --- tensorflow_io/core/kernels/bigtable/serialization.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tensorflow_io/core/kernels/bigtable/serialization.cc b/tensorflow_io/core/kernels/bigtable/serialization.cc index c427e7750..e069eb7c4 100644 --- a/tensorflow_io/core/kernels/bigtable/serialization.cc +++ b/tensorflow_io/core/kernels/bigtable/serialization.cc @@ -53,7 +53,7 @@ inline StatusOr BytesToInt64(const cbt::Cell& cell) { inline StatusOr BytesToBool(const cbt::Cell& cell) { auto const int_rep = BytesToInt32(cell); if (!int_rep.ok()) { - return int_rep.status(); + return int_rep; } return static_cast(int_rep.ValueOrDie()); } From 58be56e691a2e3222a14b9a86f8f89471e1a5d69 Mon Sep 17 00:00:00 2001 From: Kajetan Boroszko Date: Tue, 30 Nov 2021 11:00:51 +0100 Subject: [PATCH 26/30] changed bool representation --- .../core/kernels/bigtable/serialization.cc | 21 ++++++++++++------- tests/test_bigtable/test_serialization.py | 20 +++++++++--------- 2 files changed, 23 insertions(+), 18 deletions(-) diff --git a/tensorflow_io/core/kernels/bigtable/serialization.cc b/tensorflow_io/core/kernels/bigtable/serialization.cc index e069eb7c4..67086dbc2 100644 --- a/tensorflow_io/core/kernels/bigtable/serialization.cc +++ b/tensorflow_io/core/kernels/bigtable/serialization.cc @@ -51,11 +51,16 @@ inline StatusOr BytesToInt64(const cbt::Cell& cell) { } inline StatusOr BytesToBool(const cbt::Cell& cell) { - auto const int_rep = BytesToInt32(cell); - if (!int_rep.ok()) { - return int_rep; + std::string const& bytes = cell.value(); + union { + char bytes[1]; + int8_t res; + } u; + if (bytes.size() != 1U) { + return errors::InvalidArgument("Invalid bool representation."); } - return static_cast(int_rep.ValueOrDie()); + memcpy(u.bytes, bytes.data(), 1); + return u.res != 0; } inline StatusOr BytesToFloat(const cbt::Cell& cell) { @@ -133,15 +138,15 @@ inline StatusOr BytesToInt32(const cbt::Cell& cell) { return v; } -inline StatusOr BytesToBool(const cbt::Cell& cell) { +inline StatusOr BytesToBool(const cbt::Cell& cell) { std::string const& s = cell.value(); - bool_t v; + int8_t v; XDR xdrs; xdrmem_create(&xdrs, const_cast(s.data()), sizeof(v), XDR_DECODE); - if (!xdr_bool(&xdrs, &v)) { + if (!xdr_int8_t(&xdrs, &v)) { return errors::InvalidArgument("Error reading bool from byte array."); } - return v; + return v != 0; } #endif // _WIN32 diff --git a/tests/test_bigtable/test_serialization.py b/tests/test_bigtable/test_serialization.py index ecb216507..5bab1c13a 100644 --- a/tests/test_bigtable/test_serialization.py +++ b/tests/test_bigtable/test_serialization.py @@ -94,16 +94,16 @@ def setUp(self): b"\x00\x00\x00\x00\x00\x00\x00\x0c", ], "bool": [ - b"\x00\x00\x00\x00", - b"\x00\x00\x00\x01", - b"\x00\x00\x00\x01", - b"\x00\x00\x00\x01", - b"\x00\x00\x00\x01", - b"\x00\x00\x00\x01", - b"\x00\x00\x00\x01", - b"\x00\x00\x00\x01", - b"\x00\x00\x00\x01", - b"\x00\x00\x00\x01", + b'\x00', + b'\xff', + b'\xff', + b'\xff', + b'\xff', + b'\xff', + b'\xff', + b'\xff', + b'\xff', + b'\xff' ], } From 549afad99635400a651ac68f8c93291d9fef9798 Mon Sep 17 00:00:00 2001 From: Kajetan Boroszko Date: Tue, 30 Nov 2021 11:42:33 +0100 Subject: [PATCH 27/30] bytes same for win and linux --- .../core/kernels/bigtable/serialization.cc | 33 +++++++------------ 1 file changed, 11 insertions(+), 22 deletions(-) diff --git a/tensorflow_io/core/kernels/bigtable/serialization.cc b/tensorflow_io/core/kernels/bigtable/serialization.cc index 67086dbc2..eeb702e42 100644 --- a/tensorflow_io/core/kernels/bigtable/serialization.cc +++ b/tensorflow_io/core/kernels/bigtable/serialization.cc @@ -50,19 +50,6 @@ inline StatusOr BytesToInt64(const cbt::Cell& cell) { return maybe_value.value(); } -inline StatusOr BytesToBool(const cbt::Cell& cell) { - std::string const& bytes = cell.value(); - union { - char bytes[1]; - int8_t res; - } u; - if (bytes.size() != 1U) { - return errors::InvalidArgument("Invalid bool representation."); - } - memcpy(u.bytes, bytes.data(), 1); - return u.res != 0; -} - inline StatusOr BytesToFloat(const cbt::Cell& cell) { auto const int_rep = BytesToInt32(cell); if (!int_rep.ok()) { @@ -138,19 +125,21 @@ inline StatusOr BytesToInt32(const cbt::Cell& cell) { return v; } +#endif // _WIN32 + inline StatusOr BytesToBool(const cbt::Cell& cell) { - std::string const& s = cell.value(); - int8_t v; - XDR xdrs; - xdrmem_create(&xdrs, const_cast(s.data()), sizeof(v), XDR_DECODE); - if (!xdr_int8_t(&xdrs, &v)) { - return errors::InvalidArgument("Error reading bool from byte array."); + std::string const& bytes = cell.value(); + union { + char bytes[1]; + int8_t res; + } u; + if (bytes.size() != 1U) { + return errors::InvalidArgument("Invalid bool representation."); } - return v != 0; + memcpy(u.bytes, bytes.data(), 1); + return u.res != 0; } -#endif // _WIN32 - } // namespace Status PutCellValueInTensor(Tensor& tensor, size_t index, DataType cell_type, From d5db2349a8868c01b8a2a30f8440cc446cbde6c6 Mon Sep 17 00:00:00 2001 From: Kajetan Boroszko Date: Tue, 30 Nov 2021 11:44:53 +0100 Subject: [PATCH 28/30] linting --- tests/test_bigtable/test_serialization.py | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/tests/test_bigtable/test_serialization.py b/tests/test_bigtable/test_serialization.py index 5bab1c13a..f7d42f954 100644 --- a/tests/test_bigtable/test_serialization.py +++ b/tests/test_bigtable/test_serialization.py @@ -94,16 +94,16 @@ def setUp(self): b"\x00\x00\x00\x00\x00\x00\x00\x0c", ], "bool": [ - b'\x00', - b'\xff', - b'\xff', - b'\xff', - b'\xff', - b'\xff', - b'\xff', - b'\xff', - b'\xff', - b'\xff' + b"\x00", + b"\xff", + b"\xff", + b"\xff", + b"\xff", + b"\xff", + b"\xff", + b"\xff", + b"\xff", + b"\xff", ], } From 0617cb16eea4939f7088e13e42866f294ae3a6c3 Mon Sep 17 00:00:00 2001 From: Kajetan Boroszko Date: Tue, 30 Nov 2021 12:44:39 +0100 Subject: [PATCH 29/30] check bool one byte --- tensorflow_io/core/kernels/bigtable/serialization.cc | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/tensorflow_io/core/kernels/bigtable/serialization.cc b/tensorflow_io/core/kernels/bigtable/serialization.cc index eeb702e42..43d5b448e 100644 --- a/tensorflow_io/core/kernels/bigtable/serialization.cc +++ b/tensorflow_io/core/kernels/bigtable/serialization.cc @@ -129,15 +129,10 @@ inline StatusOr BytesToInt32(const cbt::Cell& cell) { inline StatusOr BytesToBool(const cbt::Cell& cell) { std::string const& bytes = cell.value(); - union { - char bytes[1]; - int8_t res; - } u; if (bytes.size() != 1U) { return errors::InvalidArgument("Invalid bool representation."); } - memcpy(u.bytes, bytes.data(), 1); - return u.res != 0; + return (*bytes.data()) != 0; } } // namespace From 6656892af726b3d9e71f489efa23ae630e4147fb Mon Sep 17 00:00:00 2001 From: Kajetan Boroszko Date: Thu, 2 Dec 2021 19:44:12 +0100 Subject: [PATCH 30/30] list and refactor tests --- .../core/kernels/bigtable/serialization.h | 2 +- tests/test_bigtable/test_serialization.py | 34 ++++++++++--------- 2 files changed, 19 insertions(+), 17 deletions(-) diff --git a/tensorflow_io/core/kernels/bigtable/serialization.h b/tensorflow_io/core/kernels/bigtable/serialization.h index 9db612b07..9ef98e951 100644 --- a/tensorflow_io/core/kernels/bigtable/serialization.h +++ b/tensorflow_io/core/kernels/bigtable/serialization.h @@ -23,7 +23,7 @@ limitations under the License. namespace tensorflow { namespace io { -// Bigtable only stores values as byte buffers - except for int64 the server +// Bigtable only stores byte buffers as values - except for int64 the server // side does not have any notion of types. Tensorflow, needs to store shorter // integers, floats, doubles, so we needed to decide on how. We chose to // follow what HBase does, since there is a path for migrating from HBase to diff --git a/tests/test_bigtable/test_serialization.py b/tests/test_bigtable/test_serialization.py index f7d42f954..704400baf 100644 --- a/tests/test_bigtable/test_serialization.py +++ b/tests/test_bigtable/test_serialization.py @@ -29,18 +29,20 @@ import datetime -def check_values(test_case, values, table, type_name, tf_dtype): - for i, r in enumerate( - table.read_rows( - ["fam1:" + type_name], - row_set=row_set.from_rows_or_ranges(row_range.infinite()), - output_type=tf_dtype, - ) - ): - test_case.assertEqual(values[i].numpy(), r.numpy()[0]) - - class BigtableReadTest(test.TestCase): + def check_values(self, values, table, type_name, tf_dtype): + for i, r in enumerate( + table.read_rows( + ["fam1:" + type_name], + row_set=row_set.from_rows_or_ranges(row_range.infinite()), + output_type=tf_dtype, + ) + ): + if tf_dtype in [tf.float64, tf.float32]: + self.assertAlmostEqual(values[i].numpy(), r.numpy()[0]) + else: + self.assertEqual(values[i].numpy(), r.numpy()[0]) + def setUp(self): self.emulator = BigtableEmulator() self.data = { @@ -137,7 +139,7 @@ def test_float_xdr(self): client = BigtableClient("fake_project", "fake_instance") table = client.get_table("test-table") - check_values(self, values, table, "float", tf.float32) + self.check_values(values, table, "float", tf.float32) def test_double_xdr(self): values = tf.constant(self.data["values"], dtype=tf.float64) @@ -145,7 +147,7 @@ def test_double_xdr(self): client = BigtableClient("fake_project", "fake_instance") table = client.get_table("test-table") - check_values(self, values, table, "double", tf.float64) + self.check_values(values, table, "double", tf.float64) def test_int64_xdr(self): values = tf.cast(tf.constant(self.data["values"]), dtype=tf.int64) @@ -153,7 +155,7 @@ def test_int64_xdr(self): client = BigtableClient("fake_project", "fake_instance") table = client.get_table("test-table") - check_values(self, values, table, "int64", tf.int64) + self.check_values(values, table, "int64", tf.int64) def test_int32_xdr(self): values = tf.cast(tf.constant(self.data["values"]), dtype=tf.int32) @@ -161,7 +163,7 @@ def test_int32_xdr(self): client = BigtableClient("fake_project", "fake_instance") table = client.get_table("test-table") - check_values(self, values, table, "int32", tf.int32) + self.check_values(values, table, "int32", tf.int32) def test_bool_xdr(self): values = tf.cast(tf.constant(self.data["values"]), dtype=tf.bool) @@ -169,4 +171,4 @@ def test_bool_xdr(self): client = BigtableClient("fake_project", "fake_instance") table = client.get_table("test-table") - check_values(self, values, table, "bool", tf.bool) + self.check_values(values, table, "bool", tf.bool)