Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Timeout of fetch (pull mode) with no data in queue #809

Closed
izarraga opened this issue Sep 1, 2021 · 10 comments · Fixed by #813
Closed

Timeout of fetch (pull mode) with no data in queue #809

izarraga opened this issue Sep 1, 2021 · 10 comments · Fixed by #813
Assignees
Labels
bug Confirmed reproducible bug

Comments

@izarraga
Copy link

izarraga commented Sep 1, 2021

After changing to server 2.4.0 and golang client 1.12 something strange happens with the timeouts of fetch in pull mode. At first it works correctly, but after a few minutes timeout goes to 0.

time.Sleep(2000 * time.Millisecond)  //  ************ I had to add this line ***************



    for {
        sub, err := js.PullSubscribe(
                                    NATS_SUBJECT_DBCODE_SLOW,
                                    NATS_DURABLE_DBCODE_SLOW,
                                    nats.ManualAck(),
                                    nats.DeliverAll(),
                                    nats.MaxDeliver(10000),
                                    nats.BindStream(NATS_STREAM_NAME),
                                    )

        if err != nil {
            log_to_file(LOG_INFO, "NATS_PULL_DBCODE_SLOW: js.PullSubscribe() err: %v", err)
        }

        for {
            msgs, err := sub.Fetch(1)

            if err != nil {
                log_to_file(LOG_INFO, "NATS_DBCODE_SLOW_PULL: sub.Fetch() err: %v", err)
                if err == nats.ErrTimeout {
                    time.Sleep(2000 * time.Millisecond)  //  ************ I had to add this line ***************
                    continue
                }

                break
            }

            for _, msg := range msgs {
                log_to_file(LOG_INFO, "NATS_PULL_DBCODE_SLOW: subject:%s, msg:%s", msg.Subject, msg.Data)
                process_dbcode_slow_msg(msg)
            }
        }

        time.Sleep(1 * time.Second)
        sub.Unsubscribe()
        log_to_file(LOG_INFO, "NATS_PULL_DBCODE_SLOW: new js.PullSubscribe")
    }
@izarraga izarraga added the bug Confirmed reproducible bug label Sep 1, 2021
@kozlovic
Copy link
Member

kozlovic commented Sep 1, 2021

@izarraga Could you give more information about how the stream is created? Also, looks like process_dbcode_slow_msg, based on the name, will be slow to process/ack each message. Is it more than the default ack wait (which is 30 seconds)? Is this callback acknowledging the message?

How would we go about to reproduce this? Have messages (how many) in the stream (again, please provide details about this stream) and have the message processing function delay for few seconds (?) and ack the message?

@izarraga
Copy link
Author

izarraga commented Sep 1, 2021

  • Callbacks is acknowledging the message.

  • I have choose this stream because it has very low traffic, it happens also without any traffic. (the other stream have the same problem).

now i have 2 seconds between log lines:

2021/09/01 17:05:58 [info] NATS_DBCODE_SLOW_PULL: sub.Fetch() err: nats: timeout
2021/09/01 17:06:00 [info] NATS_DBCODE_SLOW_PULL: sub.Fetch() err: nats: timeout
2021/09/01 17:06:02 [info] NATS_DBCODE_SLOW_PULL: sub.Fetch() err: nats: timeout
2021/09/01 17:06:04 [info] NATS_DBCODE_SLOW_PULL: sub.Fetch() err: nats: timeout
2021/09/01 17:06:06 [info] NATS_DBCODE_SLOW_PULL: sub.Fetch() err: nats: timeout
2021/09/01 17:06:08 [info] NATS_DBCODE_SLOW_PULL: sub.Fetch() err: nats: timeout
2021/09/01 17:06:10 [info] NATS_DBCODE_SLOW_PULL: sub.Fetch() err: nats: timeout

If i make a restart, 7 seconds between lines: (during some minutes, perhaps 1-2 hours, I have not measured)

2021/09/01 17:07:33 [info] NATS_DBCODE_SLOW_PULL: sub.Fetch() err: nats: timeout
2021/09/01 17:07:40 [info] NATS_DBCODE_SLOW_PULL: sub.Fetch() err: nats: timeout
2021/09/01 17:07:47 [info] NATS_DBCODE_SLOW_PULL: sub.Fetch() err: nats: timeout
2021/09/01 17:07:54 [info] NATS_DBCODE_SLOW_PULL: sub.Fetch() err: nats: timeout
2021/09/01 17:08:01 [info] NATS_DBCODE_SLOW_PULL: sub.Fetch() err: nats: timeout

func natsConnect() (nats.JetStreamContext) {

var err error
nc, err = nats.Connect(
                opt.broker,
                nats.Name(NATS_CLIENT_NAME),
                nats.RetryOnFailedConnect(true),
                nats.MaxReconnects(-1),
                nats.ReconnectWait(3 * time.Second),
                nats.DisconnectErrHandler(func(nc *nats.Conn, err error) {
                    if !nc.IsClosed() {
                        fmt.Printf("nats disconnected due to: %s\n", err)
                        log_to_file(LOG_WARN, "nats disconnected due to: %s", err)
                    }
                }),
                nats.ReconnectHandler(func(nc *nats.Conn) {
                    fmt.Printf("nats reconnected [%s]", nc.ConnectedUrl())
                    log_to_file(LOG_INFO, "nats reconnected [%s]", nc.ConnectedUrl())
                }),
                nats.ClosedHandler(func(nc *nats.Conn) {
                    if !nc.IsClosed() {
                        fmt.Printf("nats Exiting: no servers available\n")
                        log_to_file(LOG_INFO, "nats Exiting: no servers available")
                    } else {
                        fmt.Printf("nats Exiting\n")
                        log_to_file(LOG_INFO, "nats Exiting")
                    }
                }))

if err != nil {
    fmt.Printf("nats nats.Connect() err: %v\n", err)
    log_to_file(LOG_INFO, "nats nats.Connect() err: %v", err)
}
//defer ncPost.Close()

js, err := nc.JetStream()
if err != nil {
    fmt.Printf("nats nc.JetStram error: %v\n", err)
    log_to_file(LOG_INFO, "nats nc.JetStram error: %v", err)
}

stream, err := js.StreamInfo(NATS_STREAM_NAME)
if err != nil {
    fmt.Printf("nats js.StreamInfo error: %v\n", err)
    log_to_file(LOG_INFO, "nats js.StreamInfo error: %v", err)
}

if stream == nil {
    fmt.Printf("nats creating stream %q\n", NATS_STREAM_NAME)
    log_to_file(LOG_INFO, "nats creating stream %q", NATS_STREAM_NAME)
    _, err = js.AddStream(&nats.StreamConfig{
        Name:           NATS_STREAM_NAME,
        Subjects:       []string{
                                NATS_SUBJECT_TRACK,
                                NATS_SUBJECT_SENSOR,
                                NATS_SUBJECT_VEHICLEDATA,
                                NATS_SUBJECT_DBCODE,
                                NATS_SUBJECT_DBCODE_SLOW,
                                },
        NoAck:          false,
        //Retention:      nats.LimitsPolicy,    // to multiple consumers
        Retention:      nats.WorkQueuePolicy,   // to delete messages as they are acked
        Duplicates:     15 * time.Minute,
        Storage:        nats.FileStorage,
        Discard:        nats.DiscardOld,
        MaxAge:         7 * 24 * time.Hour,
        MaxConsumers:   -1,
        MaxMsgSize:     -1,
        Replicas:       1,
        MaxMsgs:        -1,
        MaxBytes:       419430400,   // 104857600 = 100mb
    })

    if err != nil {
        fmt.Printf("nats js.AddStream error: %v\n", err)
        log_to_file(LOG_INFO, "nats js.AddStream error: %v", err)
        os.Exit(1)
    }
}

return js

}

@kozlovic
Copy link
Member

kozlovic commented Sep 1, 2021

Thanks, I will see if I can reproduce this.

@kozlovic
Copy link
Member

kozlovic commented Sep 1, 2021

By the way, are you running the server in cluster mode (if so, how many) or standalone?

@izarraga
Copy link
Author

izarraga commented Sep 1, 2021

In standalone. Compiled with golang 1.17

@kozlovic
Copy link
Member

kozlovic commented Sep 1, 2021

I can reproduce this, it seems that it happens when "Waiting Pulls" has reached the maximum:

$ nats c info TEST dur
...
State:

   Last Delivered Message: Consumer sequence: 4 Stream sequence: 4
     Acknowledgment floor: Consumer sequence: 4 Stream sequence: 4
         Outstanding Acks: 0 out of maximum 65536
     Redelivered Messages: 0
     Unprocessed Messages: 0
            Waiting Pulls: 512 of maximum 512

@kozlovic
Copy link
Member

kozlovic commented Sep 1, 2021

Working on a fix asap and will update this ticket when the fix is in. Sorry for the inconvenience.

kozlovic added a commit that referenced this issue Sep 2, 2021
This would happen when pull requests would have filled the waiting
queue in the JetStream consumer and a 408 status was returned.

Resolves #809

Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
@kozlovic
Copy link
Member

kozlovic commented Sep 2, 2021

@izarraga I think that I have fixed this issue. If you could use lib from the branch of this PR #813, or when it will be merged to the main branch, and double check that this also solve what you are seeing, it will be appreciated. Thanks!

@izarraga
Copy link
Author

izarraga commented Sep 2, 2021

I have used lib from the branch and now seems to works ok (3 hours working). Many thanks to you for the speed of creating the patch.

kozlovic added a commit that referenced this issue Sep 2, 2021
This would happen when pull requests would have filled the waiting
queue in the JetStream consumer and a 408 status was returned.

Resolves #809

Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
@kozlovic
Copy link
Member

kozlovic commented Sep 2, 2021

@izarraga Thanks for checking! I am going to do a release v1.12.1 today, because this issue I think warrants it.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Confirmed reproducible bug
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants