Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

timeout or no response waiting for nats jetstream server #1058

Open
liyancoding opened this issue Jan 4, 2024 · 12 comments
Open

timeout or no response waiting for nats jetstream server #1058

liyancoding opened this issue Jan 4, 2024 · 12 comments
Labels
defect Suspected defect such as a bug or regression

Comments

@liyancoding
Copy link

Observed behavior

The NATS cluster is normal. When the NATS CLI tool is used to test the stream status of each node in the cluster, the Java connection reports the error "timeout or no response waiting for NATS jetstream server", which is sometimes recovered but sometimes the fault persists

Expected behavior

NATS is expected to work properly

Server and client version

v2.9.23

Host environment

No response

Steps to reproduce

Streams, themes, and configurations are created or updated when the program is started

@liyancoding liyancoding added the defect Suspected defect such as a bug or regression label Jan 4, 2024
@scottf
Copy link
Contributor

scottf commented Jan 4, 2024

What version of the client?
What is the connection timeout (default?)
What does your network between the client and server look like?
What cli commands are you running?
Can you give an example of your java code?

@liyancoding
Copy link
Author

liyancoding commented Jan 5, 2024

What version of the client?
What is the connection timeout (default?)
What does your network between the client and server look like?
What cli commands are you running?
Can you give an example of your java code?

The client version is 2.15.3.

connection timeout is 2s

cli commands is 'nats str info'、'nats con info'

public static void createConsumerInfo(String streamName, String consumer, String subject) throws IOException, JetStreamApiException {
        JetStreamManagement jsm = connection.jetStreamManagement();
        List<String> consumerNames = jsm.getConsumerNames(streamName);
        if (!consumerNames.contains(consumer)) {
            jsm.addOrUpdateConsumer(streamName, createConsumerConfiguration(consumer, subject));
            log.info("consumer infos: {}", jsm.getConsumers(streamName));
            return;
        }
        ConsumerInfo consumerInfo = jsm.getConsumerInfo(streamName, consumer);
        if (!consumerInfo.getConsumerConfiguration().getFilterSubject().equals(subject)) {
            jsm.deleteConsumer(streamName, consumer);
            jsm.addOrUpdateConsumer(streamName, createConsumerConfiguration(consumer, subject));
            log.info("consumer infos: {}", jsm.getConsumers(streamName));
        }
    }
public static ConsumerConfiguration createConsumerConfiguration(String consumer, String subject) {
        return ConsumerConfiguration.builder()
                .durable(consumer)
                .filterSubject(subject)
                .replayPolicy(ReplayPolicy.Instant)
                .build();
    }
 public static PullSubscribeOptions createPullSubscribeOptions(String consumer, String subject) {
        return PullSubscribeOptions
                .builder()
                .configuration(createConsumerConfiguration(consumer, subject))
                .build();
    }
 public static <T> T subscribeSingleMessage(String stream, String subject, String consumer, Class<T> tClass) {
        try {
            NatsUtils.createConsumerInfo(stream, consumer, subject);
            log.info("url is: '{}'", connection.getConnectedUrl());
            JetStreamSubscription streamSubscription = connection.jetStream().subscribe(subject, createPullSubscribeOptions(consumer, subject));
            List<Message> messageList = streamSubscription.fetch(1, Duration.ofMillis(1000));
            if (CollectionUtils.isEmpty(messageList) || messageList.size() != 1) {
                return null;
            }
            Message message = messageList.get(0);
            message.ack();
            T result = JsonUtils.fromJson(new String(message.getData(), StandardCharsets.UTF_8), tClass);
            return result;
        } catch (IOException e) {
            log.error("failed subscribeSingleMessage due to 'IOException'", e);
            throw new InternalException(HttpStatus.BAD_REQUEST, VtsErrorCode.NATS_SUBSCRIBE_FAILURE, e);
        } catch (JetStreamApiException e) {
            log.error("failed subscribeSingleMessage due to 'JetStreamApiException'", e);
            throw new InternalException(HttpStatus.BAD_REQUEST, VtsErrorCode.NATS_SUBSCRIBE_FAILURE, e);
        }
    }

@scottf
Copy link
Contributor

scottf commented Jan 5, 2024

Can you move to the latest client. 2.15 is pretty old. There could be an issue in there, but I can't remember.

Also which line of code is giving a timeout? The getConsumerNames ?

@scottf
Copy link
Contributor

scottf commented Feb 12, 2024

@liyancoding Are you still able to reproduce this? Is there a main or something that uses the code you provided, I'm trying to reproduce this.

@scottf
Copy link
Contributor

scottf commented Feb 18, 2024

@liyancoding I have an update on this based on another user report.

The other user's code was doing a async publish, and in their error handler that they attached to the future, they tried to do a getConsumerInfo and it timed out. (For later, getConsumerInfo is just a core request reply with an api subject and known json response data)

What I discovered is that the problem with their code was that since they were still inside the future, they could not make core api calls because they were blocking the read-loop of the reader thread, because they were already inside of the read-loop.

I am able to repeat this at will now.

There are 2 workarounds currently. 1) use a different connection since a different connection will have a completely different read-loop. 2) make sure you are outside of the future

Please let me know if this helps. We are in the process of determining if this is possible to address, but I don't want to add yet another thread. It may be a matter of documentation, at least I understand why it's happening.

@z0mb1ek
Copy link

z0mb1ek commented Apr 21, 2024

i has same trouble when i make send:

java.io.IOException: Timeout or no response waiting for NATS JetStream server
	at io.nats.client.impl.NatsJetStreamImpl.responseRequired(NatsJetStreamImpl.java:249)
	at io.nats.client.impl.NatsJetStreamImpl.makeInternalRequestResponseRequired(NatsJetStreamImpl.java:241)
	at io.nats.client.impl.NatsJetStream.publishSyncInternal(NatsJetStream.java:155)
	at io.nats.client.impl.NatsJetStream.publish(NatsJetStream.java:50)

can i make natsConnection().jetStream() as a bean or should call it every time when i need to send message?

@scottf
Copy link
Contributor

scottf commented Apr 23, 2024

jetstream is really just a functional wrapper around the connection and there is no cost except instantiation and initialization

@scottf
Copy link
Contributor

scottf commented Apr 23, 2024

As far as the issues you have with a timeout there are at least 2 possible causes.

  1. The server has disconnected or just too busy (probably not too busy though)
  2. You are making the calls in an async way that one blocks the other.
    But I'm just speculating. I think I would need to see some more code. Maybe make a gist or put some code in a github repo and link it here.

@majdiAlKotamy
Copy link

majdiAlKotamy commented May 8, 2024

I'm facing the same problem, still looking for solution.

@scottf
Copy link
Contributor

scottf commented May 8, 2024

I'm facing the same problem, still looking for solution.

Can you give some more details or explanation about your specific problem, you environment, your threading? Code examples that reproduce the issue are always helpful. There have been some changes and some options added in the last couple of releases, so they might help your situation.

@majdiAlKotamy
Copy link

majdiAlKotamy commented May 9, 2024

// Copyright 2021 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at:
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package io.nats.hello;

import io.nats.client.*;
import io.nats.client.api.PublishAck;
import io.nats.client.api.StorageType;
import io.nats.client.api.StreamConfiguration;
import io.nats.client.api.StreamInfo;
import io.nats.client.support.JsonUtils;

import java.time.Duration;

/*
JetStream Hello World Example using the NATS client for Java
 */
public class HelloWorld
{
    public static void main( String[] args )
    {
        try (Connection nc = Nats.connect("nats://localhost:4222")) {
            JetStreamManagement jsm = nc.jetStreamManagement();

            // Build the configuration
            StreamConfiguration streamConfig = StreamConfiguration.builder()
                    .name("hello")
                    .subjects("world")
                    .storageType(StorageType.Memory)
                    .build();

            // Create the stream
            StreamInfo streamInfo = jsm.addStream(streamConfig);
            JsonUtils.printFormatted(streamInfo);

            JetStream js = nc.jetStream();
            PublishAck ack = js.publish("world", "one".getBytes());
            JsonUtils.printFormatted(ack);

            ack = js.publish("world", "two".getBytes());
            JsonUtils.printFormatted(ack);

            JetStreamSubscription sub = js.subscribe("world");
            Message m = sub.nextMessage(Duration.ofSeconds(3));
            m.ack();
            System.out.println("Message: " + m.getSubject() + " " + new String(m.getData()));
            JsonUtils.printFormatted(m.metaData());

            m = sub.nextMessage(Duration.ofSeconds(3));
            m.ack();
            System.out.println("Message: " + m.getSubject() + " " + new String(m.getData()));
            JsonUtils.printFormatted(m.metaData());
        }
        catch (Exception e) {
            e.printStackTrace();
        }
    }
}

my dependency: implementation 'io.nats:jnats:2.17.6'

Nats-server setup and work successfully on docker locally

Screenshot 2024-05-09 111239

Error message:
Screenshot 2024-05-09 111417

@scottf
Copy link
Contributor

scottf commented May 10, 2024

This example works running a cluster directly on my machine.
I'm almost 100% sure this is a docker networking issue and is a different issue than the original one reported, which had to do with failure during async calls that ended up blocking.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
defect Suspected defect such as a bug or regression
Projects
None yet
Development

No branches or pull requests

4 participants