From fb0981421dc55d18d9ef8238fe89ac4e7b5529f4 Mon Sep 17 00:00:00 2001 From: Kajetan Boroszko Date: Fri, 26 Nov 2021 14:52:58 +0100 Subject: [PATCH] fix: building on windows (#12) --- WORKSPACE | 12 ++ .../core/kernels/bigtable/serialization.cc | 98 ++++++++-- .../core/kernels/bigtable/serialization.h | 17 +- tests/test_bigtable/test_serialization.py | 174 ++++++++++++++++++ 4 files changed, 280 insertions(+), 21 deletions(-) create mode 100644 tests/test_bigtable/test_serialization.py diff --git a/WORKSPACE b/WORKSPACE index e421f779b..c83f45fb8 100644 --- a/WORKSPACE +++ b/WORKSPACE @@ -59,9 +59,21 @@ http_archive( ) # Note com_google_googleapis is placed earlier as we need to adjust switched_rules_by_language option +# Note we have to change one word in the field_behavior.proto so it compiles on WINDOWS +# for more infor please refer to https://github.com/protocolbuffers/protobuf/issues/7076 +# Because of a bug in protocol buffers (protocolbuffers/protobuf#7076), new versions of this project +# fail to compile on Windows. The problem hinges on OPTIONAL being defined as an empty string under +# Windows. This makes the preprocessor remove every mention of OPTIONAL from the code, which causes +# compilation failures. This temporary workaround renames the name of the protobuf value OPTIONAL to +# OPIONAL. This should be safe as it does not affect the generated protobufs. http_archive( name = "com_google_googleapis", build_file = "@com_github_googleapis_google_cloud_cpp//bazel:googleapis.BUILD", + patch_cmds = [ + """sed -i.bak 's/OPTIONAL/OPIONAL/g' google/api/field_behavior.proto""", + """sed -i.bak 's/OPTIONAL/OPIONAL/g' google/pubsub/v1beta2/pubsub.proto""", + """sed -i.bak 's/OPTIONAL/OPIONAL/g' google/pubsub/v1/pubsub.proto""", + ], sha256 = "a53e15405f81d5a32594d7f6486e649131fadda5431cf28377dff4ae54d45d16", strip_prefix = "googleapis-d4d09eb3aec152015f35717102f9b423988b94f7", urls = [ diff --git a/tensorflow_io/core/kernels/bigtable/serialization.cc b/tensorflow_io/core/kernels/bigtable/serialization.cc index aa14fdefc..43d5b448e 100644 --- a/tensorflow_io/core/kernels/bigtable/serialization.cc +++ b/tensorflow_io/core/kernels/bigtable/serialization.cc @@ -16,14 +16,73 @@ limitations under the License. #include "tensorflow_io/core/kernels/bigtable/serialization.h" -#include "rpc/xdr.h" #include "tensorflow/core/platform/errors.h" #include "tensorflow/core/platform/statusor.h" +namespace cbt = ::google::cloud::bigtable; + namespace tensorflow { namespace io { +namespace { + +#ifdef _WIN32 + +#include + +inline StatusOr BytesToInt32(const cbt::Cell& cell) { + std::string const& bytes = cell.value(); + union { + char bytes[4]; + int32_t res; + } u; + if (bytes.size() != 4U) { + return errors::InvalidArgument("Invalid int32 representation."); + } + memcpy(u.bytes, bytes.data(), 4); + return ntohl(u.res); +} + +inline StatusOr BytesToInt64(const cbt::Cell& cell) { + auto maybe_value = cell.decode_big_endian_integer(); + if (!maybe_value.ok()) { + return errors::InvalidArgument("Invalid int32 representation."); + } + return maybe_value.value(); +} + +inline StatusOr BytesToFloat(const cbt::Cell& cell) { + auto const int_rep = BytesToInt32(cell); + if (!int_rep.ok()) { + return int_rep; + } + union { + float res; + int32_t int_rep; + } u; + u.int_rep = *int_rep; + return u.res; +} + +inline StatusOr BytesToDouble(const cbt::Cell& cell) { + auto const int_rep = BytesToInt64(cell); + if (!int_rep.ok()) { + return int_rep; + } + union { + double res; + int64_t int_rep; + } u; + u.int_rep = *int_rep; + return u.res; +} + +#else // _WIN32 + +#include "rpc/types.h" +#include "rpc/xdr.h" -inline StatusOr BytesToFloat(std::string const& s) { +inline StatusOr BytesToFloat(const cbt::Cell& cell) { + std::string const& s = cell.value(); float v; XDR xdrs; xdrmem_create(&xdrs, const_cast(s.data()), sizeof(v), XDR_DECODE); @@ -33,7 +92,8 @@ inline StatusOr BytesToFloat(std::string const& s) { return v; } -inline StatusOr BytesToDouble(std::string const& s) { +inline StatusOr BytesToDouble(const cbt::Cell& cell) { + std::string const& s = cell.value(); double v; XDR xdrs; xdrmem_create(&xdrs, const_cast(s.data()), sizeof(v), XDR_DECODE); @@ -43,7 +103,8 @@ inline StatusOr BytesToDouble(std::string const& s) { return v; } -inline StatusOr BytesToInt64(std::string const& s) { +inline StatusOr BytesToInt64(const cbt::Cell& cell) { + std::string const& s = cell.value(); int64_t v; XDR xdrs; xdrmem_create(&xdrs, const_cast(s.data()), sizeof(v), XDR_DECODE); @@ -53,7 +114,8 @@ inline StatusOr BytesToInt64(std::string const& s) { return v; } -inline StatusOr BytesToInt32(std::string const& s) { +inline StatusOr BytesToInt32(const cbt::Cell& cell) { + std::string const& s = cell.value(); int32_t v; XDR xdrs; xdrmem_create(&xdrs, const_cast(s.data()), sizeof(v), XDR_DECODE); @@ -63,16 +125,18 @@ inline StatusOr BytesToInt32(std::string const& s) { return v; } -inline StatusOr BytesToBool(std::string const& s) { - bool_t v; - XDR xdrs; - xdrmem_create(&xdrs, const_cast(s.data()), sizeof(v), XDR_DECODE); - if (!xdr_bool(&xdrs, &v)) { - return errors::InvalidArgument("Error reading bool from byte array."); +#endif // _WIN32 + +inline StatusOr BytesToBool(const cbt::Cell& cell) { + std::string const& bytes = cell.value(); + if (bytes.size() != 1U) { + return errors::InvalidArgument("Invalid bool representation."); } - return v; + return (*bytes.data()) != 0; } +} // namespace + Status PutCellValueInTensor(Tensor& tensor, size_t index, DataType cell_type, google::cloud::bigtable::Cell const& cell) { switch (cell_type) { @@ -82,7 +146,7 @@ Status PutCellValueInTensor(Tensor& tensor, size_t index, DataType cell_type, } break; case DT_BOOL: { auto tensor_data = tensor.tensor(); - auto maybe_parsed_data = BytesToBool(cell.value()); + auto maybe_parsed_data = BytesToBool(cell); if (!maybe_parsed_data.ok()) { return maybe_parsed_data.status(); } @@ -90,7 +154,7 @@ Status PutCellValueInTensor(Tensor& tensor, size_t index, DataType cell_type, } break; case DT_INT32: { auto tensor_data = tensor.tensor(); - auto maybe_parsed_data = BytesToInt32(cell.value()); + auto maybe_parsed_data = BytesToInt32(cell); if (!maybe_parsed_data.ok()) { return maybe_parsed_data.status(); } @@ -98,7 +162,7 @@ Status PutCellValueInTensor(Tensor& tensor, size_t index, DataType cell_type, } break; case DT_INT64: { auto tensor_data = tensor.tensor(); - auto maybe_parsed_data = BytesToInt64(cell.value()); + auto maybe_parsed_data = BytesToInt64(cell); if (!maybe_parsed_data.ok()) { return maybe_parsed_data.status(); } @@ -106,7 +170,7 @@ Status PutCellValueInTensor(Tensor& tensor, size_t index, DataType cell_type, } break; case DT_FLOAT: { auto tensor_data = tensor.tensor(); - auto maybe_parsed_data = BytesToFloat(cell.value()); + auto maybe_parsed_data = BytesToFloat(cell); if (!maybe_parsed_data.ok()) { return maybe_parsed_data.status(); } @@ -114,7 +178,7 @@ Status PutCellValueInTensor(Tensor& tensor, size_t index, DataType cell_type, } break; case DT_DOUBLE: { auto tensor_data = tensor.tensor(); - auto maybe_parsed_data = BytesToDouble(cell.value()); + auto maybe_parsed_data = BytesToDouble(cell); if (!maybe_parsed_data.ok()) { return maybe_parsed_data.status(); } diff --git a/tensorflow_io/core/kernels/bigtable/serialization.h b/tensorflow_io/core/kernels/bigtable/serialization.h index 962616eaa..9ef98e951 100644 --- a/tensorflow_io/core/kernels/bigtable/serialization.h +++ b/tensorflow_io/core/kernels/bigtable/serialization.h @@ -18,15 +18,24 @@ limitations under the License. #include "google/cloud/bigtable/table.h" #include "tensorflow/core/framework/tensor.h" +#include "tensorflow/core/platform/statusor.h" namespace tensorflow { namespace io { -// Bigtable only stores values as byte buffers - except for int64 the server +// Bigtable only stores byte buffers as values - except for int64 the server // side does not have any notion of types. Tensorflow, needs to store shorter -// integers, floats, doubles, so we needed to decide on how. We chose to follow -// what HBase does, since there is a path for migrating from HBase to Bigtable. -// XDR seems to match what HBase does. +// integers, floats, doubles, so we needed to decide on how. We chose to +// follow what HBase does, since there is a path for migrating from HBase to +// Bigtable. HBase stores integers as big-endian and floats as IEEE754 +// (also big-endian). Given that integer endianness does not always match +// float endianness, and the fact that there are architectures where it is +// neither little nor big (BE-32), implementing this properly is non-trivial. +// Ideally, we would use a library to do that. XDR matches what HBase does, +// but it is not easily available on Windows, so we decided to go with a +// hybrid approach. On Windows we assume that integer endianness matches float +// endianness and implement the deserialization ourselves and everywhere else +// we use XDR. For that reason we provide two implementations Status PutCellValueInTensor(Tensor& tensor, size_t index, DataType cell_type, google::cloud::bigtable::Cell const& cell); diff --git a/tests/test_bigtable/test_serialization.py b/tests/test_bigtable/test_serialization.py new file mode 100644 index 000000000..704400baf --- /dev/null +++ b/tests/test_bigtable/test_serialization.py @@ -0,0 +1,174 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# disable module docstring for tests +# pylint: disable=C0114 +# disable class docstring for tests +# pylint: disable=C0115 + +import os +from .bigtable_emulator import BigtableEmulator +from tensorflow_io.python.ops.bigtable.bigtable_dataset_ops import BigtableClient +import tensorflow_io.python.ops.bigtable.bigtable_row_range as row_range +import tensorflow_io.python.ops.bigtable.bigtable_row_set as row_set +import tensorflow as tf +from tensorflow import test +from google.auth.credentials import AnonymousCredentials +from google.cloud.bigtable import Client +import datetime + + +class BigtableReadTest(test.TestCase): + def check_values(self, values, table, type_name, tf_dtype): + for i, r in enumerate( + table.read_rows( + ["fam1:" + type_name], + row_set=row_set.from_rows_or_ranges(row_range.infinite()), + output_type=tf_dtype, + ) + ): + if tf_dtype in [tf.float64, tf.float32]: + self.assertAlmostEqual(values[i].numpy(), r.numpy()[0]) + else: + self.assertEqual(values[i].numpy(), r.numpy()[0]) + + def setUp(self): + self.emulator = BigtableEmulator() + self.data = { + "values": [i * 10 / 7 for i in range(10)], + "float": [ + b"\x00\x00\x00\x00", + b"?\xb6\xdbn", + b"@6\xdbn", + b"@\x89$\x92", + b"@\xb6\xdbn", + b"@\xe4\x92I", + b"A\t$\x92", + b"A \x00\x00", + b"A6\xdbn", + b"AM\xb6\xdb", + ], + "double": [ + b"\x00\x00\x00\x00\x00\x00\x00\x00", + b"?\xf6\xdbm\xb6\xdbm\xb7", + b"@\x06\xdbm\xb6\xdbm\xb7", + b"@\x11$\x92I$\x92I", + b"@\x16\xdbm\xb6\xdbm\xb7", + b"@\x1c\x92I$\x92I%", + b"@!$\x92I$\x92I", + b"@$\x00\x00\x00\x00\x00\x00", + b"@&\xdbm\xb6\xdbm\xb7", + b"@)\xb6\xdbm\xb6\xdbn", + ], + "int32": [ + b"\x00\x00\x00\x00", + b"\x00\x00\x00\x01", + b"\x00\x00\x00\x02", + b"\x00\x00\x00\x04", + b"\x00\x00\x00\x05", + b"\x00\x00\x00\x07", + b"\x00\x00\x00\x08", + b"\x00\x00\x00\n", + b"\x00\x00\x00\x0b", + b"\x00\x00\x00\x0c", + ], + "int64": [ + b"\x00\x00\x00\x00\x00\x00\x00\x00", + b"\x00\x00\x00\x00\x00\x00\x00\x01", + b"\x00\x00\x00\x00\x00\x00\x00\x02", + b"\x00\x00\x00\x00\x00\x00\x00\x04", + b"\x00\x00\x00\x00\x00\x00\x00\x05", + b"\x00\x00\x00\x00\x00\x00\x00\x07", + b"\x00\x00\x00\x00\x00\x00\x00\x08", + b"\x00\x00\x00\x00\x00\x00\x00\n", + b"\x00\x00\x00\x00\x00\x00\x00\x0b", + b"\x00\x00\x00\x00\x00\x00\x00\x0c", + ], + "bool": [ + b"\x00", + b"\xff", + b"\xff", + b"\xff", + b"\xff", + b"\xff", + b"\xff", + b"\xff", + b"\xff", + b"\xff", + ], + } + + os.environ["BIGTABLE_EMULATOR_HOST"] = self.emulator.get_addr() + self.emulator.create_table( + "fake_project", "fake_instance", "test-table", ["fam1"] + ) + + client = Client( + project="fake_project", credentials=AnonymousCredentials(), admin=True + ) + table = client.instance("fake_instance").table("test-table") + + for type_name in ["float", "double", "int32", "int64", "bool"]: + rows = [] + for i, value in enumerate(self.data[type_name]): + row_key = "row" + str(i).rjust(3, "0") + row = table.direct_row(row_key) + row.set_cell( + "fam1", type_name, value, timestamp=datetime.datetime.utcnow() + ) + rows.append(row) + table.mutate_rows(rows) + + def tearDown(self): + self.emulator.stop() + + def test_float_xdr(self): + values = tf.constant(self.data["values"], dtype=tf.float32) + + client = BigtableClient("fake_project", "fake_instance") + table = client.get_table("test-table") + + self.check_values(values, table, "float", tf.float32) + + def test_double_xdr(self): + values = tf.constant(self.data["values"], dtype=tf.float64) + + client = BigtableClient("fake_project", "fake_instance") + table = client.get_table("test-table") + + self.check_values(values, table, "double", tf.float64) + + def test_int64_xdr(self): + values = tf.cast(tf.constant(self.data["values"]), dtype=tf.int64) + + client = BigtableClient("fake_project", "fake_instance") + table = client.get_table("test-table") + + self.check_values(values, table, "int64", tf.int64) + + def test_int32_xdr(self): + values = tf.cast(tf.constant(self.data["values"]), dtype=tf.int32) + + client = BigtableClient("fake_project", "fake_instance") + table = client.get_table("test-table") + + self.check_values(values, table, "int32", tf.int32) + + def test_bool_xdr(self): + values = tf.cast(tf.constant(self.data["values"]), dtype=tf.bool) + + client = BigtableClient("fake_project", "fake_instance") + table = client.get_table("test-table") + + self.check_values(values, table, "bool", tf.bool)