Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DatagramSocket read stalls on JVM #3406

Open
mpilquist opened this issue Mar 5, 2024 · 2 comments
Open

DatagramSocket read stalls on JVM #3406

mpilquist opened this issue Mar 5, 2024 · 2 comments
Labels

Comments

@mpilquist
Copy link
Member

mpilquist commented Mar 5, 2024

I was looking in to the intermittent test failures in #3376 and seemed to have found a bug in our support for UDP on the JVM. Consider the following application, which starts a UDP echo server and then loops forever, creating a client socket, sending a datagram to the server, and waiting for a response.

//> using scala 3.3.3
//> using lib co.fs2::fs2-io::3.9.4

import cats.effect.{IO, IOApp, Ref}
import cats.syntax.all.*
import fs2.{Chunk, Stream}
import fs2.io.net.{Datagram, Network}
import scodec.bits.ByteVector
import scala.concurrent.duration.*
import com.comcast.ip4s.*

object Udp extends IOApp.Simple:

  def run = prg.compile.drain

  def prg =
    Stream.resource(Network[IO].openDatagramSocket()).flatMap: serverSocket =>
      Stream.eval(serverSocket.localAddress).flatMap: serverAddress0 =>
        Stream.eval(Ref.of[IO, Map[Int, Int]](Map.empty)).flatMap: statsRef =>
          val serverAddress = SocketAddress(ip"127.0.0.1", serverAddress0.port)
          val server = serverSocket.reads.evalTap(dg => serverSocket.write(dg))
          val clients = Stream.iterate(0)(_ + 1).evalMap: i =>
            val msg = Chunk.byteVector(ByteVector.fromInt(i))
            Network[IO].openDatagramSocket().use: clientSocket =>
              def loop(attempt: Int): IO[Unit] =
                (if attempt > 0 then IO.println(s"Retrying msg $i attempt $attempt") else IO.unit) >>
                  // Swap this line for next to reduce likelihood of read stalls
                  // IO.sleep(100.milliseconds * attempt.min(10) + 10.milliseconds) >>
                  IO.sleep(100.milliseconds * attempt.min(10)) >>
                  clientSocket.write(Datagram(serverAddress, msg)) >>
                  clientSocket.read
                    .map(dg => assert(dg.bytes == msg))
                    .flatMap(_ => statsRef.update(_ |+| Map(attempt -> 1)))
                    .timeoutTo(1.seconds, IO.defer(loop(attempt + 1)))
              loop(0)
          val monitor = Stream.awakeEvery[IO](1.second).evalMap(_ => statsRef.get).foreach(stats => IO.println(stats))
          clients.concurrently(server).concurrently(monitor)

If a response hasn't been received within a second, the read is cancelled and the message is resent. Retries continue until a datagram has been read successfully from the client socket.

The program keeps a running tally of the number of attempts it took to receive the echoed response.

Running this results in output like:

Map(1 -> 1, 0 -> 91421, 13 -> 1)

In this example, there were 91,421 client sockets that received a response to their first attempt, 1 socket that required a single retry, and 1 socket that required 13 retries.

When this program is run on the JVM (via scala-cli run udp.scala), these retries occur more frequently than expected due to UDP packet loss. More worrisome, sometimes the retries continue forever. In such cases, the underlying selector key has been marked interested in reads but the selector never indicates the socket is ready, and manually inspecting the key confirms readyOps = 0.

Some sample outputs:

Stalled example:

Retrying msg 253828 attempt 555
Map(49 -> 1, 0 -> 253825, 177 -> 1, 174 -> 1)

Non-stalled example but with extensive retries:

Map(0 -> 474809, 14 -> 1, 121 -> 1, 89 -> 1, 137 -> 1, 7 -> 1, 95 -> 1, 11 -> 1, 104 -> 1, 94 -> 1, 185 -> 1, 42 -> 1)

Adding a small delay after opening the client socket and before writing to it decreases the likelihood of a stall happening, which seems to point towards some type of race condition at socket creation / registration time.

Running the same program on Scala.js (via scala-cli run --js --js-module-kind es udp.scala) results in no drops & no stalls.

@mpilquist mpilquist added the bug label Mar 5, 2024
@armanbilge
Copy link
Member

In such cases, the underlying selector key has been marked interested in reads but the selector never indicates the socket is ready, and manually inspecting the key confirms readyOps = 0.

What about actually attempting to read from the socket? i.e. is it possible there is something to read on the socket even though the selector is not becoming read-ready?

@mpilquist
Copy link
Member Author

Yeah, our implementation attempts the read here:

-- read1 calls channel.receive. No luck.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

2 participants