Skip to content
This repository has been archived by the owner on Nov 23, 2017. It is now read-only.

Can't receive replies to multicast UDP packets #480

Open
gpjt opened this issue Jan 4, 2017 · 6 comments
Open

Can't receive replies to multicast UDP packets #480

gpjt opened this issue Jan 4, 2017 · 6 comments

Comments

@gpjt
Copy link

gpjt commented Jan 4, 2017

It doesn't appear to be possible to receive replies to multicast UDP messages. Server-side multicast does work, but only with a bit of extra config.

Given the following (working) server code, adapted from the UDP Echo example:

import asyncio
import logging
import socket
import struct

BROADCAST_PORT = 1910
BROADCAST_ADDR = "239.255.255.250"


class MulticastServerProtocol:

    def connection_made(self, transport):
        self.transport = transport


    def datagram_received(self, data, addr):
        print('Received {!r} from {!r}'.format(data, addr))
        data = "I received {!r}".format(data).encode("ascii")
        print('Send {!r} to {!r}'.format(data, addr))
        self.transport.sendto(data, addr)



loop = asyncio.get_event_loop()
loop.set_debug(True)
logging.basicConfig(level=logging.DEBUG)


sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.bind(('', BROADCAST_PORT))
group = socket.inet_aton(BROADCAST_ADDR)
mreq = struct.pack('4sL', group, socket.INADDR_ANY)
sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)

listen = loop.create_datagram_endpoint(
    MulticastServerProtocol,
    sock=sock,
)
transport, protocol = loop.run_until_complete(listen)

loop.run_forever()
loop.close()

...the following non-asyncio client code sends a broadcast packet and correctly receives the responses:

import socket
import struct
import sys

BROADCAST_PORT = 1910
BROADCAST_ADDR = "239.255.255.250"


sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.settimeout(3)
ttl = struct.pack('b', 1)
sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, ttl)

try:
    sent = sock.sendto(
        sys.argv[1].encode("ascii"),
        (BROADCAST_ADDR, BROADCAST_PORT)
    )

    while True:
        try:
            data, server = sock.recvfrom(1024)
        except socket.timeout:
            break
        else:
            print("Reply from {}: {!r}".format(server, data))

finally:
    sock.close()

However, the following code, which I'm pretty sure is the asyncio equivalent, sends out the mutlicast packet correctly but never receives a response:

import asyncio
import socket
import struct
import sys

BROADCAST_PORT = 1910
BROADCAST_ADDR = "239.255.255.250"



class DiscoveryClientProtocol:
    def __init__(self, loop):
        self.loop = loop
        self.transport = None

    def connection_made(self, transport):
        self.transport = transport
        sock = self.transport.get_extra_info('socket')
        sock.settimeout(3)
        ttl = struct.pack('b', 1)
        sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, ttl)

        self.transport.sendto(sys.argv[1].encode("ascii"))

    def datagram_received(self, data, addr):
        print("Reply from {}: {!r}".format(addr, data))
        # Don't close the socket as we might get multiple responses.

    def error_received(self, exc):
        print('Error received:', exc)

    def connection_lost(self, exc):
        print("Socket closed, stop the event loop")
        self.loop.stop()


loop = asyncio.get_event_loop()
connect = loop.create_datagram_endpoint(
    lambda: DiscoveryClientProtocol(loop),
    remote_addr=(BROADCAST_ADDR, BROADCAST_PORT),
)
transport, protocol = loop.run_until_complete(connect)
loop.run_forever()
transport.close()
loop.close()

@Martiusweb
Copy link
Member

Hi, I haven't looked in depth, but this:

    sock.settimeout(3)

puts the socket in blocking mode. You must call sock.settimeout(0) or sock.set_blocking(False).

@gpjt
Copy link
Author

gpjt commented Jan 5, 2017

I get the same effect if I remove that line, though :-( Likewise if I replace it with sock.settimeout(0) or with sock.setblocking(False)

@jnises
Copy link

jnises commented Jan 25, 2017

Any news on this?
I seem to be running into the same issue.

@Julius2342
Copy link

I experience the same problem. What I found out, asnycio server works, if there is a non-asyncio server running in parallel ...

@vxgmichel
Copy link

I've been able to reproduce your issue using this code.

The problem is related to asyncio connecting to the broadcast address. Quoting this post:

Now, the main problem is that once you connect() a UDP socket, that effectively
establishes a one-to-one relationship, such that you can only send out data to that
one address, AND you can only receive data from that one address... So, anything
that arrives that is NOT from the address you connect()'d to is dropped... And, of
course, nothing that arrives will actually appear to be FROM the broadcast address
(either the limited or subnet one)... Instead, it'll be from the actual host IP that
really sent it...

The following patch fixes the issue:

diff --git a/asyncio/base_events.py b/asyncio/base_events.py
index 0174375..5b1256e 100644
--- a/asyncio/base_events.py
+++ b/asyncio/base_events.py
@@ -828,7 +828,8 @@ def create_datagram_endpoint(self, protocol_factory,
                     if local_addr:
                         sock.bind(local_address)
                     if remote_addr:
-                        yield from self.sock_connect(sock, remote_address)
+                        if not allow_broadcast:
+                            yield from self.sock_connect(sock, remote_address)
                         r_addr = remote_address
                 except OSError as exc:
                     if sock is not None:
diff --git a/asyncio/selector_events.py b/asyncio/selector_events.py
index ed2b4d7..02082c8 100644
--- a/asyncio/selector_events.py
+++ b/asyncio/selector_events.py
@@ -1044,7 +1044,7 @@ def sendto(self, data, addr=None):
             # Attempt to send it right away first.
             try:
                 if self._address:
-                    self._sock.send(data)
+                    self._sock.sendto(data, self._address)
                 else:
                     self._sock.sendto(data, addr)
                 return

There might be a better way though.

vxgmichel pushed a commit to vxgmichel/asyncio that referenced this issue Feb 16, 2017
This fixes issue python#480.
The _SelectorDatagramTransport.sendto method has to be modified
so _sock.sendto is used in all cases (since there is no proper
way to tell if the socket is connected or not). Cound that be an
issue for connected sockets?
vxgmichel pushed a commit to vxgmichel/asyncio that referenced this issue Feb 16, 2017
vxgmichel pushed a commit to vxgmichel/asyncio that referenced this issue Feb 16, 2017
This fixes issue python#480.
The _SelectorDatagramTransport.sendto method has to be modified
so _sock.sendto is used in all cases (since there is no proper
way to tell if the socket is connected or not). Could that be an
issue for connected sockets?
vxgmichel pushed a commit to vxgmichel/asyncio that referenced this issue Feb 16, 2017
vxgmichel pushed a commit to vxgmichel/asyncio that referenced this issue Feb 16, 2017
vxgmichel pushed a commit to vxgmichel/asyncio that referenced this issue Feb 16, 2017
vxgmichel pushed a commit to vxgmichel/asyncio that referenced this issue Feb 16, 2017
@frawau
Copy link

frawau commented Aug 23, 2017

I am not sure the current status of this discussion, but for the sake of those stumbling upon this thread...

asyncio can handle this. Just create your own socket and pass it on to created_datagram_endpoint, asyncio will leave your socket alone. Also you must specify address and port in self.transport.sendto. Here is the client (slightly modified to support IPv6 multicast, just in case someone is interrested)

import asyncio
import socket
import struct
import sys

BROADCAST_PORT = 1910
BROADCAST_ADDR = "239.255.255.250"
#BROADCAST_ADDR = "ff0e::10"

class DiscoveryClientProtocol:
    def __init__(self, loop, addr):
        self.loop = loop
        self.transport = None
        self.addr = addr

    def connection_made(self, transport):
        self.transport = transport
        sock = self.transport.get_extra_info('socket')
        sock.settimeout(3)
        addrinfo = socket.getaddrinfo(self.addr, None)[0]
        if addrinfo[0] == socket.AF_INET: # IPv4
            ttl = struct.pack('@i', 1)
            sock.setsockopt(socket.IPPROTO_IP, 
                socket.IP_MULTICAST_TTL, ttl)
        else:
            ttl = struct.pack('@i', 2)
            sock.setsockopt(socket.IPPROTO_IPV6, 
                socket.IPV6_MULTICAST_HOPS, ttl)

        self.transport.sendto(sys.argv[1].encode("ascii"), (self.addr,BROADCAST_PORT))

    def datagram_received(self, data, addr):
        print("Reply from {}: {!r}".format(addr, data))
        # Don't close the socket as we might get multiple responses.

    def error_received(self, exc):
        print('Error received:', exc)

    def connection_lost(self, exc):
        print("Socket closed, stop the event loop")
        self.loop.stop()


loop = asyncio.get_event_loop()

addrinfo = socket.getaddrinfo(BROADCAST_ADDR, None)[0]
sock = socket.socket(addrinfo[0], socket.SOCK_DGRAM)
connect = loop.create_datagram_endpoint(
    lambda: DiscoveryClientProtocol(loop,BROADCAST_ADDR),
    sock=sock,
)
transport, protocol = loop.run_until_complete(connect)
loop.run_forever()
transport.close()
loop.close()

So as to cover the IPv6 bits, here is the server side

import asyncio
import logging
import socket
import struct

BROADCAST_PORT = 1910
BROADCAST_ADDR = "239.255.255.250"
#BROADCAST_ADDR = "ff0e::10"

class MulticastServerProtocol:

    def connection_made(self, transport):
        self.transport = transport


    def datagram_received(self, data, addr):
        print('Received {!r} from {!r}'.format(data, addr))
        data = "I received {!r}".format(data).encode("ascii")
        print('Send {!r} to {!r}'.format(data, addr))
        self.transport.sendto(data, addr)

loop = asyncio.get_event_loop()
loop.set_debug(True)
logging.basicConfig(level=logging.DEBUG)

addrinfo = socket.getaddrinfo(BROADCAST_ADDR, None)[0]
sock = socket.socket(addrinfo[0], socket.SOCK_DGRAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
group_bin = socket.inet_pton(addrinfo[0], addrinfo[4][0])
if addrinfo[0] == socket.AF_INET: # IPv4
    sock.bind(('', BROADCAST_PORT))
    mreq = group_bin + struct.pack('=I', socket.INADDR_ANY)
    sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq)
else:
    sock.bind(('', BROADCAST_PORT))
    mreq = group_bin + struct.pack('@I', 0)
    sock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_JOIN_GROUP, mreq)



listen = loop.create_datagram_endpoint(
    MulticastServerProtocol,
    sock=sock,
)
transport, protocol = loop.run_until_complete(listen)

loop.run_forever()
loop.close()

Me think this issue is a non issue and therefore should be closed

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

6 participants