Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upgrade nng version please. #103

Open
sailxjx opened this issue Apr 18, 2022 · 6 comments
Open

Upgrade nng version please. #103

sailxjx opened this issue Apr 18, 2022 · 6 comments

Comments

@sailxjx
Copy link

sailxjx commented Apr 18, 2022

As the author of nng said, the bus protocol should be very much better in recent builds nanomsg/nng#1571 (comment)

But pynng still use the version 1.5 years ago, hope the author can upgrade the dependent nng version.

@codypiersall
Copy link
Owner

Good idea. I'll do this soon.

@wbadart
Copy link

wbadart commented Jun 3, 2022

@codypiersall do you have a rough outline of how you typically go about upgrading the underlying nng version? Would love to help out if I can.

@patrickwolf
Copy link

Is the pynng library still being actively maintained?

@ntakouris
Copy link

Any updates on this @codypiersall ?

@codypiersall
Copy link
Owner

Hmmm, turns out back in April when I said

Good idea. I'll do this soon.

I had no idea what I was talking about.

Unfortunately life has been, well, busy for the last several months, and when it seems there will be a reprieve, it turns out that there isn't. I'm traveling for work for the rest of this month and won't have time to work on this project.

Is the pynng library still being actively maintained?

Hmmm, in some ways a deep ontological question that is. I do have the intention of getting back to this repo. Unfortunately no clue on a timeline.

do you have a rough outline of how you typically go about upgrading the underlying nng version? Would love to help out if I can.

From a clean pynng repo, I

  • Update the NNG_REVISION in setup.py to the revision that I want to test
  • Try a pip install -e . in this repo (in a virtual environment)
  • Fix whatever build issues crop up on my local machine, due to nng API changes or whatever.
  • Push changes to a branch and see if it passes CI

@ntakouris
Copy link

ntakouris commented Oct 12, 2022

@codypiersall before your comment, I had done this process through trial and error. You can find the repo at https://github.com/LAMBDA-AUTOMATA/pynng (don't expect me to maintain anything).

Some tests fail, but our internal test suite passes, _and our field testing personnel confirm that the segfault that was happening on previous builds, has gone away .

UPDATE 17/10/2022: The segfault is still happenning.

I am pretty sure this has something to do with this open nng issue (nanomsg/nng#1523) , where problems seem to go away due to a recent internal rewrite (API stays the same).

However, our internal systems rely on a small Pub0/Sub0 and Req0 Rep0 functionality. Here are the usage examples that it works for:

class Subscriber:
    """
    A class that acts as a subscriber on a given socket address. Subscribers connect
    to the publisher on that address, to receive data bytes via the `receive` function.
    """

    ALL_TOPICS = ""

    def __init__(
        self, address: str, recv_buffer_size: int = 1, recv_timeout_ms: int = 10
    ) -> None:
        self.address = address
        self._socket = None
        self.recv_buffer_size = recv_buffer_size
        self.recv_timeout_ms = recv_timeout_ms

    def __str__(self) -> str:
        return f"Subscriber({self.address})"

    def __enter__(self):
        try:
            self._socket = pynng.Sub0(
                dial=self.address,
                topics=self.ALL_TOPICS,
                recv_timeout=self.recv_timeout_ms,
                recv_buffer_size=self.recv_buffer_size,
                block_on_dial=False,
            )
        except BaseException as e:
            logger.error(e)
            raise CommunicationError(f"Failed to setup {self}")

        return self

    def __exit__(self, type, value, traceback):
        try:
            self._socket.__exit__(type, value, traceback)
        except BaseException as e:
            logger.error(e)

    async def receive(self) -> Optional[bytes]:
        """
        Receive bytes as a self-contained message from the socket.
        Returns none upon timeout."""
        try:
            msg = await self._socket.arecv_msg()
        except pynng.exceptions.Timeout:
            return None
        except BaseException as e:
            logger.error(e)
            raise CommunicationError(f"{self} failed to receive message")

        return msg.bytes
class Publisher:
    """
    A class that acts as a publisher on a given socket address. Subscribers connect
    to it, to receive data bytes sent via the `send` function.
    """

    def __init__(self, address: str) -> None:
        self.address = address
        self._socket = None

    def __str__(self) -> str:
        return f"Publisher({self.address})"

    def __enter__(self):
        try:
            self._socket = pynng.Pub0(listen=self.address)
        except BaseException as e:
            logger.error(e)
            raise CommunicationError(f"Failed to setup {self}")
        return self

    def __exit__(self, type, value, traceback):
        try:
            self._socket.__exit__(type, value, traceback)
        except BaseException as e:
            logger.error(e)

    async def send(self, data: bytes) -> None:
        """
        Sends bytes over the socket as a self-contained message.
        """
        try:
            await self._socket.asend(data)
        except pynng.exceptions.Timeout:
            return None
        except BaseException as e:
            logger.error(e)
            raise CommunicationError(f"{self} failed to receive message")
class RequestClient:
    """
    A class that acts as a Req0 on a given socket address. Subscribers connect
    to it, to receive data bytes sent via the `send` function.
    """

    EMPTY_REQUEST = b""

    def __init__(self, address: str, recv_timeout_ms: int = 200) -> None:
        self.address = address
        self._socket = None

        self.recv_timeout_ms = recv_timeout_ms

    @staticmethod
    async def simple_request(
        address: str, payload: bytes = None, recv_timeout_ms: int = 200
    ) -> Optional[bytes]:
        payload = payload or RequestClient.EMPTY_REQUEST

        with RequestClient(address=address, recv_timeout_ms=recv_timeout_ms) as req:
            resp_bytes = await req.send(payload)
            if resp_bytes == RequestServer.EMPTY_RESPONSE:
                return None

            return resp_bytes

    def __str__(self) -> str:
        return f"RequestClient({self.address})"

    def __enter__(self):
        try:
            self._socket = pynng.Req0(
                dial=self.address,
                send_timeout=self.recv_timeout_ms,
                recv_timeout=self.recv_timeout_ms,
            )
        except BaseException as e:
            logger.error(e)
            raise CommunicationError(f"Failed to connect {self}")

        return self

    def __exit__(self, type, value, traceback):
        try:
            self._socket.__exit__(type, value, traceback)
        except BaseException as e:
            logger.error(e)

    async def send(self, data: bytes) -> Optional[bytes]:
        try:
            await self._socket.asend(data)
            response = await self._socket.arecv_msg()
        except pynng.exceptions.Timeout:
            return None
        except BaseException as e:
            logger.error(e)
            raise CommunicationError(f"{self} failed to communicate")

        return response.bytes
class RequestServer:
    """
    A class that acts as a subscriber on a given socket address. Subscribers connect
    to the publisher on that address, to receive data bytes via the `receive` function.
    """

    ALL_TOPICS = ""
    EMPTY_RESPONSE = b""

    def __init__(
        self, address: str, recv_timeout_ms: int = 200, recv_buffer_size: int = 1
    ) -> None:
        self.address = address
        self._socket = None

        self.recv_timeout_ms = recv_timeout_ms
        self.recv_buffer_size = recv_buffer_size

    def __str__(self) -> str:
        return f"RequestServer({self.address})"

    def __enter__(self):
        try:
            self._socket = pynng.Rep0(
                listen=self.address,
                recv_timeout=self.recv_timeout_ms,
                send_timeout=self.recv_timeout_ms,
                recv_buffer_size=self.recv_buffer_size,
            )
        except BaseException as e:
            logger.error(e)
            raise CommunicationError(f"Failed to set up {self}")
        return self

    def __exit__(self, type, value, traceback):
        try:
            self._socket.__exit__(type, value, traceback)
        except BaseException as e:
            logger.error(e)

    async def receive(self, send_empty_response: bool = False) -> Union[bytes, None]:
        try:
            msg = await self._socket.arecv_msg()
            if send_empty_response == True:
                await self._socket.asend(self.EMPTY_RESPONSE)
        except pynng.exceptions.Timeout:
            return None
        except BaseException as e:
            logger.error(e)
            raise CommunicationError(f"{self} failed to receive message")

        return msg.bytes

    async def reply(self, data: bytes = None) -> bool:
        data = data or self.EMPTY_RESPONSE

        try:
            await self._socket.asend(data)
        except pynng.exceptions.Timeout:
            return False
        except BaseException as e:
            logger.error(repr(e))
            raise CommunicationError(
                f"{self} failed to reply to message. Reason: {repr(e)}"
            )

        return True

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants