Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Promises for commands in flight do not return when the connection is dropped #150

Open
ericchapman opened this issue Aug 20, 2019 · 4 comments

Comments

@ericchapman
Copy link
Contributor

Hey @Mordil. I am using v3.4.0 (the Vapor 3.0/Redis NIO implementation) and noticed that when requests are in flight and the connection gets dropped, the promise is not getting fulfilled with an error (or anything for that matter), so I never know it happened. The system just hangs.

Easiest way to reproduce is to do a long "brpop" command and kill the connection. You'll see that no error is thrown and no response is received. See below

print("DEBUG: sending brpop request")
redisClient.send(RedisData.array(["brpop", "queue_key", "5"].map { RedisData(bulk: $0) })).do { data in
    print("DEBUG: received data \(data)")
}.catch { error in
    print("DEBUG: received error \(error)")
}

If I wait the 5 seconds and let the timeout expire, I will get "nil" back as expected. If I kill the connection before that, I get nothing back.

As a workaround, I am going to create a delayed task that will fulfill the promise with an error if I get nothing back by the time it runs, and then cancel it if I do get something back. This feels hacky/expensive but will get the job done for now.

I don't know the NIO code very well, but is there a way to clear out pending promises with an error when the connection closes?

@ericchapman
Copy link
Contributor Author

I got a chance to look at this. I was able to resolve this at the RedisClient level. I did the following

  • In RedisClient -> Send
    • Store the promise when it is created
      • Note I used a dictionary since the array "removeObject" is now a pain in Swift
    • Remove the promise when it is fulfilled (future.always)
  • In RedisClient -> Init -> channel.closureFuture.always
    • Iterate through the stored promises and send a "ChannelError"
    • Empty the storage object

Diff below

bash-3.2$ git diff
diff --git a/Sources/Redis/Client/RedisClient.swift b/Sources/Redis/Client/RedisClient.swift
index 177277a..901ec86 100644
--- a/Sources/Redis/Client/RedisClient.swift
+++ b/Sources/Redis/Client/RedisClient.swift
@@ -20,7 +20,10 @@ public final class RedisClient: DatabaseConnection, BasicWorker {
 
     /// The channel
     private let channel: Channel
-
+    
+    /// Stores the inflight promises so they can be fulfilled when the channel drops
+    var inflightPromises: [String:EventLoopPromise<RedisData>] = [:]
+    
     /// Creates a new Redis client on the provided data source and sink.
     init(queue: RedisCommandHandler, channel: Channel) {
         self.queue = queue
@@ -28,6 +31,12 @@ public final class RedisClient: DatabaseConnection, BasicWorker {
         self.extend = [:]
         self.isClosed = false
         channel.closeFuture.always {
+            // send closed error for the promises that have not been fulfilled
+            for promise in self.inflightPromises.values {
+                promise.fail(error: ChannelError.ioOnClosedChannel)
+            }
+            self.inflightPromises.removeAll()
+            
             self.isClosed = true
         }
     }
@@ -55,6 +64,13 @@ public final class RedisClient: DatabaseConnection, BasicWorker {
         // create a new promise to fulfill later
         let promise = eventLoop.newPromise(RedisData.self)
         
+        // logic to store in-flight requests
+        let key = UUID().uuidString
+        self.inflightPromises[key] = promise
+        promise.futureResult.always {
+            self.inflightPromises.removeValue(forKey: key)
+        }
+        
         // write the message and the promise to the channel, which the `RequestResponseHandler` will capture
         return self.channel.writeAndFlush((message, promise))
             .flatMap { return promise.futureResult }

@Mordil If this solution is acceptable, I'll create a PR accordingly. Let me know what you think

@ericchapman
Copy link
Contributor Author

@Mordil I created a PR for this. Use it if you desire. I needed to fix it for my needs regardless.

@Mordil
Copy link
Member

Mordil commented Sep 6, 2019

@ericchapman Sorry for the extremely long delay in response - August has been way too busy for me.

I left a comment on the PR of where the code can live. The primary thing is the handler is missing a good implementation for either channelInactive or something else to respond to the channel now closing - which is honestly a problem upstream w/ SwiftNIO Extras as well

@ericchapman
Copy link
Contributor Author

@Mordil That was what I was looking for! I knew the in-flight promises were stored somewhere already. I just needed to go a level deeper in the code. Let me know if my modified code is correct. And no problem being busy. It happens to all of us.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants