From 60b8912c9d51e71f1eaef1856dfd012f05743e4a Mon Sep 17 00:00:00 2001 From: IlyaFaer Date: Thu, 28 May 2020 12:10:24 +0300 Subject: [PATCH 1/4] refactor: incorporate will_accept() checks into publish() --- .../pubsub_v1/publisher/_batch/thread.py | 4 +-- .../pubsub_v1/publisher/batch/test_base.py | 30 ------------------- .../pubsub_v1/publisher/batch/test_thread.py | 14 --------- .../publisher/test_publisher_client.py | 3 -- 4 files changed, 2 insertions(+), 49 deletions(-) diff --git a/google/cloud/pubsub_v1/publisher/_batch/thread.py b/google/cloud/pubsub_v1/publisher/_batch/thread.py index 73210011d..67c9f2de3 100644 --- a/google/cloud/pubsub_v1/publisher/_batch/thread.py +++ b/google/cloud/pubsub_v1/publisher/_batch/thread.py @@ -333,8 +333,8 @@ def publish(self, message): self._status != base.BatchStatus.ERROR ), "Publish after stop() or publish error." - if not self.will_accept(message): - return future + if self.status != base.BatchStatus.ACCEPTING_MESSAGES: + return size_increase = types.PublishRequest(messages=[message]).ByteSize() diff --git a/tests/unit/pubsub_v1/publisher/batch/test_base.py b/tests/unit/pubsub_v1/publisher/batch/test_base.py index 96f18451d..f10b54ee5 100644 --- a/tests/unit/pubsub_v1/publisher/batch/test_base.py +++ b/tests/unit/pubsub_v1/publisher/batch/test_base.py @@ -46,33 +46,3 @@ def test_len(): assert len(batch) == 0 batch.publish(types.PubsubMessage(data=b"foo")) assert len(batch) == 1 - - -def test_will_accept(): - batch = create_batch(status=BatchStatus.ACCEPTING_MESSAGES) - message = types.PubsubMessage() - assert batch.will_accept(message) is True - - -def test_will_accept_oversize(): - batch = create_batch( - settings=types.BatchSettings(max_bytes=10), - status=BatchStatus.ACCEPTING_MESSAGES, - ) - message = types.PubsubMessage(data=b"abcdefghijklmnopqrstuvwxyz") - assert batch.will_accept(message) is True - - -def test_will_not_accept_status(): - batch = create_batch(status="talk to the hand") - message = types.PubsubMessage() - assert batch.will_accept(message) is False - - -def test_will_not_accept_number(): - batch = create_batch( - settings=types.BatchSettings(max_messages=-1), - status=BatchStatus.ACCEPTING_MESSAGES, - ) - message = types.PubsubMessage(data=b"abc") - assert batch.will_accept(message) is False diff --git a/tests/unit/pubsub_v1/publisher/batch/test_thread.py b/tests/unit/pubsub_v1/publisher/batch/test_thread.py index ce288a48e..2fa6ded55 100644 --- a/tests/unit/pubsub_v1/publisher/batch/test_thread.py +++ b/tests/unit/pubsub_v1/publisher/batch/test_thread.py @@ -287,20 +287,6 @@ def test_publish_updating_batch_size(): assert batch.size > 0 # I do not always trust protobuf. -def test_publish_not_will_accept(): - batch = create_batch(topic="topic_foo", max_messages=0) - base_request_size = types.PublishRequest(topic="topic_foo").ByteSize() - - # Publish the message. - message = types.PubsubMessage(data=b"foobarbaz") - future = batch.publish(message) - - assert future is None - assert batch.size == base_request_size - assert batch.messages == [] - assert batch._futures == [] - - def test_publish_exceed_max_messages(): max_messages = 4 batch = create_batch(max_messages=max_messages) diff --git a/tests/unit/pubsub_v1/publisher/test_publisher_client.py b/tests/unit/pubsub_v1/publisher/test_publisher_client.py index 4ca979892..9c7cf7eb9 100644 --- a/tests/unit/pubsub_v1/publisher/test_publisher_client.py +++ b/tests/unit/pubsub_v1/publisher/test_publisher_client.py @@ -128,7 +128,6 @@ def test_publish(): # Use a mock in lieu of the actual batch class. batch = mock.Mock(spec=client._batch_class) # Set the mock up to claim indiscriminately that it accepts all messages. - batch.will_accept.return_value = True batch.publish.side_effect = (mock.sentinel.future1, mock.sentinel.future2) topic = "topic/path" @@ -185,7 +184,6 @@ def test_publish_attrs_bytestring(): # Use a mock in lieu of the actual batch class. batch = mock.Mock(spec=client._batch_class) # Set the mock up to claim indiscriminately that it accepts all messages. - batch.will_accept.return_value = True topic = "topic/path" client._set_batch(topic, batch) @@ -391,7 +389,6 @@ def test_publish_with_ordering_key(): # Use a mock in lieu of the actual batch class. batch = mock.Mock(spec=client._batch_class) # Set the mock up to claim indiscriminately that it accepts all messages. - batch.will_accept.return_value = True batch.publish.side_effect = (mock.sentinel.future1, mock.sentinel.future2) topic = "topic/path" From 1377eecb509de15a6c605522715a413099d77a14 Mon Sep 17 00:00:00 2001 From: IlyaFaer Date: Thu, 28 May 2020 12:18:05 +0300 Subject: [PATCH 2/4] delete will_accept() method --- .../cloud/pubsub_v1/publisher/_batch/base.py | 26 ------------------- 1 file changed, 26 deletions(-) diff --git a/google/cloud/pubsub_v1/publisher/_batch/base.py b/google/cloud/pubsub_v1/publisher/_batch/base.py index 53d3dee5b..212a4b277 100644 --- a/google/cloud/pubsub_v1/publisher/_batch/base.py +++ b/google/cloud/pubsub_v1/publisher/_batch/base.py @@ -109,32 +109,6 @@ def status(self): """ raise NotImplementedError - def will_accept(self, message): - """Return True if the batch is able to accept the message. - - In concurrent implementations, the attributes on the current batch - may be modified by other workers. With this in mind, the caller will - likely want to hold a lock that will make sure the state remains - the same after the "will accept?" question is answered. - - Args: - message (~.pubsub_v1.types.PubsubMessage): The Pub/Sub message. - - Returns: - bool: Whether this batch can accept the message. - """ - # If this batch is not accepting messages generally, return False. - if self.status != BatchStatus.ACCEPTING_MESSAGES: - return False - - # If this message will make the batch exceed the ``max_messages`` - # setting, return False. - if len(self.messages) >= self.settings.max_messages: - return False - - # Okay, everything is good. - return True - def cancel(self, cancellation_reason): """Complete pending futures with an exception. From 4f64faa9d290ec54889a3fb89b1a76b28d290f8c Mon Sep 17 00:00:00 2001 From: IlyaFaer Date: Tue, 2 Jun 2020 13:27:58 +0300 Subject: [PATCH 3/4] add unit tests --- .../pubsub_v1/publisher/batch/test_thread.py | 39 +++++++++++++++++++ 1 file changed, 39 insertions(+) diff --git a/tests/unit/pubsub_v1/publisher/batch/test_thread.py b/tests/unit/pubsub_v1/publisher/batch/test_thread.py index 2fa6ded55..70662f89f 100644 --- a/tests/unit/pubsub_v1/publisher/batch/test_thread.py +++ b/tests/unit/pubsub_v1/publisher/batch/test_thread.py @@ -287,6 +287,45 @@ def test_publish_updating_batch_size(): assert batch.size > 0 # I do not always trust protobuf. +def test_publish(): + batch = create_batch() + message = types.PubsubMessage() + future = batch.publish(message) + + assert len(batch.messages) == 1 + assert batch._futures == [future] + + +def test_publish_max_messages_enforced(): + batch = create_batch(topic="topic_foo", max_messages=1) + + message = types.PubsubMessage(data=b"foobarbaz") + message2 = types.PubsubMessage(data=b"foobarbaz2") + + future = batch.publish(message) + future2 = batch.publish(message2) + + assert future is not None + assert future2 is None + assert len(batch.messages) == 1 + assert len(batch._futures) == 1 + + +def test_publish_max_bytes_enforced(): + batch = create_batch(topic="topic_foo", max_bytes=15) + + message = types.PubsubMessage(data=b"foobarbaz") + message2 = types.PubsubMessage(data=b"foobarbaz2") + + future = batch.publish(message) + future2 = batch.publish(message2) + + assert future is not None + assert future2 is None + assert len(batch.messages) == 1 + assert len(batch._futures) == 1 + + def test_publish_exceed_max_messages(): max_messages = 4 batch = create_batch(max_messages=max_messages) From 70b7cd5a45d4cd25f8b9ec05c9dc2fc348997ab9 Mon Sep 17 00:00:00 2001 From: IlyaFaer Date: Fri, 5 Jun 2020 12:13:18 +0300 Subject: [PATCH 4/4] add zero max messages unit test --- tests/unit/pubsub_v1/publisher/batch/test_thread.py | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/tests/unit/pubsub_v1/publisher/batch/test_thread.py b/tests/unit/pubsub_v1/publisher/batch/test_thread.py index 70662f89f..e9d2b09c0 100644 --- a/tests/unit/pubsub_v1/publisher/batch/test_thread.py +++ b/tests/unit/pubsub_v1/publisher/batch/test_thread.py @@ -296,6 +296,19 @@ def test_publish(): assert batch._futures == [future] +def test_publish_max_messages_zero(): + batch = create_batch(topic="topic_foo", max_messages=0) + + message = types.PubsubMessage(data=b"foobarbaz") + with mock.patch.object(batch, "commit") as commit: + future = batch.publish(message) + + assert future is not None + assert len(batch.messages) == 1 + assert batch._futures == [future] + commit.assert_called_once() + + def test_publish_max_messages_enforced(): batch = create_batch(topic="topic_foo", max_messages=1)