Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] Timeout on consuming message from azure service bus through Application Gateway #40020

Open
fanemama opened this issue May 3, 2024 · 12 comments
Assignees
Labels
Client This issue points to a problem in the data-plane of the library. customer-reported Issues that are reported by GitHub users external to the Azure organization. needs-team-attention This issue needs attention from Azure service team or SDK team question The issue doesn't require a change to the product in order to be resolved. Most issues start as that Service Bus

Comments

@fanemama
Copy link

fanemama commented May 3, 2024

Context:
We are using Azure Service Bus via an application gateway (custom endpoint) with the transport type: AmqpTransportType.AMQP_WEB_SOCKETS.
Our consumer encounters regularly a connection timeout issue and the application stops consuming messages.
We are constantly forced to restart the application to consume again messages.

Do you have a solution for our issue ? and an explanation of this behaviours

Stack trace:

{"az.sdk.message":"Error occurred while refreshing token that is not retriable. Not scheduling refresh task. Use ActiveClientTokenManager.authorize() to schedule task again.","exception":"Could not emit tick 256 due to lack of requests (interval doesn't support small downstream requests that replenish slower than the ticks)","scopes":[https:// t](https://*******,"audience":"amqp://"}

{"az.sdk.message":"Timeout waiting for RemoteClose. Manually terminating EndpointStates and completing close.","connectionId":"MF_a2709f_1714114657307","entityPath":"","linkName":""}

{"az.sdk.message":"onLinkRemoteClose","connectionId":"MF_6ffaaf_1714548329200","errorCondition":"amqp:link:detach-forced","errorDescription":"The link 'G14:5461966: ' is force detached. Code: publisher(link162650). Details: AmqpMessagePublisher.IdleTimerExpired: Idle timeout: 00:10:00.","linkName":"","entityPath":"***********"}

@github-actions github-actions bot added Client This issue points to a problem in the data-plane of the library. customer-reported Issues that are reported by GitHub users external to the Azure organization. needs-team-attention This issue needs attention from Azure service team or SDK team question The issue doesn't require a change to the product in order to be resolved. Most issues start as that Service Bus labels May 3, 2024
Copy link

github-actions bot commented May 3, 2024

@anuchandy @conniey @lmolkova

Copy link

github-actions bot commented May 3, 2024

Thank you for your feedback. Tagging and routing to the team member best able to assist.

@anuchandy
Copy link
Member

Hello @fanemama, the first error message indicate that the background scheduled task that is responsible for renewing the auth token at regular interval is failing to do so at certain point. If this is happening often then it possibly indicating an unstable network between application running consumer and AD endpoint or broker. I wonder if there is a restricted network or proxy or some kind of firewall rules that result in dropping the connection often.

@fanemama
Copy link
Author

fanemama commented May 4, 2024

Hi  @anuchandy ,
To give you more contexts, All our application developped in .Net or Python doesn't have this issue. We only have this error with application develloped with Java and runing on docker container.  In addition to that, before creating this issue, we had already contacted  the support azure and we  had many meetings  to check our network; everything seemed fine they didn't detect anything unsual.
The information of the azure support ticket bellow.
 - Timeout when using the application gateway ... - TrackingID#2402210050003918 

Out of the different meetings we had with the support, the conclusion was that it is not a net work issue.

@anuchandy
Copy link
Member

@fanemama, let me prepare a setup (SB resource behind app-gateway and SDK consumer running in a Docker instance) to see if this can be reproduced.

@lazhar
Copy link

lazhar commented May 7, 2024

FYI @anuchandy I will know take the lead on this, helping @fanemama .

Thank you for preparing this setup.

@lazhar
Copy link

lazhar commented May 13, 2024

Hi @anuchandy, any news on this ? Thanks.

@anuchandy
Copy link
Member

Hi @lazhar, I’ve been looking into this. One thing I noticed is, if the gateway front end sent FIN+ACK and TCP RST, then the underlying proton-j library does not signal connection termination to the application. I’ve created an issue in that project’s JIRA [PROTON-2823] Proton-J does not raise transport closed when TCP FIN+ACK arrives followed by TCP RST - ASF JIRA (apache.org) . I’m not sure if your environment is impacted by this.

Here are a few observations I had that I thought helpful to share -

  1. Application gateway seems to have a request time-out (under Backend settings for port 443) config, setting it a higher value (30-60 seconds) will reduce the disconnection chances.
  2. There is an AMQP system property setting that enables clients to send heartbeat signals to the broker (via gateway), that can help with keeping the connection from idling. The property can be passed as a VM Option E.g., -Dqpid.heartbeat=10, I can see this is enabling the underlying proton-j library to send empty frames to the broker (doc). I read in the references that one should not set this at a very low value, I was trying with 10-20 seconds.

Also, may I know Service Bus SDK version, the mode of authentication (e.g., connection string) and how the receive code looks like.

@glmva
Copy link

glmva commented May 15, 2024

Hello @anuchandy,

Thank you very much for your help and feedback.
I'm also working with @lazhar and @fanemama on this issue.

On the infrastructure, network side, we have noticed the same findings about FIN+ACK and TCP RST and you confirm my assumption that the application does not get the signal and thus doesn't try to restart the connection.
Do you have an idea which service initiate these FIN+ACK + TCP RST and why ? Do you think we should we focus also on this ?

We have already setup the timeout on Application Gateway at 120s (following that note: https://learn.microsoft.com/en-us/azure/application-gateway/application-gateway-websocket#backendaddresspool-backendhttpsetting-and-routing-rule-configuration). 120s is double than the default timeout of ASB message locks. To be on safe side, even if it is not optimized. And indeed, we noticed little improvement on some applications.

I let @lazhar answer about the code details.

Thank you again for your help.

@lazhar
Copy link

lazhar commented May 15, 2024

Hi @anuchandy, thank you for your help and for opening the ticket about Proton-J.

We will try to set the heartbeat option in our java applications and see if it helps.

We are also trying to see if this heartbeat option can be set directly in the code somewhere.

@lazhar
Copy link

lazhar commented May 15, 2024

@anuchandy I saw this option on .NET SDK. Is it the heartbeat we are talking about ?

Do you know the equivalent in the java SDK ?

@lazhar
Copy link

lazhar commented May 15, 2024

@anuchandy and to answer your questions:

  • service Bus SDK version:
    • for the first project: spring-cloud-azure-starter --> 5.5.0 and azure-messaging-servicebus --> 7.14.3
    • for the second project: spring-cloud-azure-starter --> 5.11.0 and azure-messaging-servicebus --> 7.15.2
  • mode of authentication: managed identity
  • how the receive code looks like (second project):
// In @Configuration file
    @Bean
      public ServiceBusClientBuilder serviceBusClient() {
      return new ServiceBusClientBuilder().fullyQualifiedNamespace(fullyQualifiedName).customEndpointAddress(customEndpoint).transportType(AmqpTransportType.AMQP_WEB_SOCKETS).credential(ClientSecretCredentialBuilder().clientId(clientId).tenantId(tenantId).clientSecret(secretId).build());
   }

   
  
  // In a @Service file
   
      @PostConstruct
   public void receiveMessages() {
      ServiceBusProcessorClient mainService = AzureBusUtility.createServiceBusProcessorClient(serviceBusClientBuilder, topicName, subscriptionName, this::processMessage, this::processError).buildProcessorClient();

      try {
         mainService.start();
         log.info("Starting the processor");
         Schedulers.boundedElastic().schedulePeriodically(() -> {
            log.info("Is Main service still running: " + mainService.isRunning());
            AzureBusUtility.isServiceRunning(mainService);
         }, 30L, 30L, TimeUnit.SECONDS);
      }catch (ServiceBusException serviceBusException){
         AzureBusUtility.isServiceRunning(mainService);
      }

   }
   
    public  ServiceBusProcessorClient createServiceBusProcessorClient(ServiceBusClientBuilder serviceBusClientBuilder, String topicName, String subscriptionName,
         Consumer<ServiceBusReceivedMessageContext> processMessage, Consumer<ServiceBusErrorContext> processError) {
      return serviceBusClientBuilder.processor().topicName(topicName).subscriptionName(subscriptionName).maxConcurrentCalls(1).maxAutoLockRenewDuration(Duration.ofMinutes(2L)).receiveMode(ServiceBusReceiveMode.PEEK_LOCK).disableAutoComplete()
            .processMessage(processMessage).processError(processError).disableAutoComplete();
   }
   
    protected static void isServiceRunning(ServiceBusProcessorClient service){
      if (!service.isRunning()) {
         service.stop();
         log.info("Stopping the service");
         service.close();
         log.info("Closing the service");
         service.start();
         log.info("Restart the service");
      }
   }
   
   void processMessage(ServiceBusReceivedMessageContext context) {
      final ServiceBusReceivedMessage message = context.getMessage();

      try {
         log.info("Processing message. Id: {}, Sequence #: {}. Contents: {} %n", message.getMessageId(), message.getSequenceNumber(), message.getBody());
         context.complete();
       
      catch (Exception processingException) {
         log.error(message.getMessageId() + ERROR_MESSAGE + " : Processing exception", processingException.getMessage());
         context.deadLetter(AzureBusUtility.prepareDeadLetterOptions(processingException, message, message.getMessageId() + ":Processing exception"));
         log.info(message.getMessageId() + DLQ_MESSAGE);
      }
   }


   void processError(ServiceBusErrorContext context) {
      if (!(context.getException() instanceof ServiceBusException exception)) {
         log.error("Non-ServiceBusException occurred: %s", context.getException());
         return;
      }
      log.error("ServiceBusException source: {}. Reason: {}. Is transient? {}.", context.getErrorSource(), exception.getReason(), exception.isTransient(), context.getException());
   }
  • I also share with you how the sending code looks like (first project):
@Configuration
@Slf4j
public class AzureServiceBusConfiguration {

   @Value("${spring.cloud.azure.profile.tenant-id}")
   private String tenantId;
   @Value("${spring.cloud.azure.credential.client-id}")
   private String clientId;
   @Value("${spring.cloud.azure.credential.client-secret}")
   private String secretId;
   @Value("${spring.cloud.azure.servicebus.entity-name}")
   private String topicName;
   @Value("${spring.cloud.azure.servicebus.processor.subscription-name}")
   private String subscriptionName;
   @Value("${fully.qualified.name}")
   private String fullyQualifiedName;
   @Value("${custom.endpoint}")
   private String customEndpoint;


   @Primary
   @Bean(name = "SendBusMessage")
   ServiceBusSenderClient sendBusMessage() {
      ServiceBusClientBuilder builder = new ServiceBusClientBuilder().fullyQualifiedNamespace(fullyQualifiedName).customEndpointAddress(customEndpoint).transportType(AmqpTransportType.AMQP_WEB_SOCKETS).credential(getAuth().build());

      return builder.sender().topicName(topicName).buildClient();
   }

   private ClientSecretCredentialBuilder getAuth() {
      return new ClientSecretCredentialBuilder().clientId(clientId).tenantId(tenantId).clientSecret(secretId);
   }

}
@Slf4j
@Service("AzureBusService")
public class AzureBusServiceImpl implements AzureBusService {

   private static final String          BUS_PROPERTIES_CLAIM_KEY = "claimNumber";
   private final ServiceBusSenderClient senderClient;

   public AzureBusServiceImpl(@Qualifier("SendBusMessage") ServiceBusSenderClient senderClient) {
      this.senderClient = senderClient;
   }

   @Override
   public void sendMessage(String subject, String message, String messageSessionId, String claimNumber) {
      log.info("Writing message \"{}\" in the Azure Bus topic \"{}\"", message, senderClient.getEntityPath());

      ServiceBusMessage busMessage = new ServiceBusMessage(message).setSessionId(messageSessionId).setSubject(subject);
      busMessage.getApplicationProperties().put(BUS_PROPERTIES_CLAIM_KEY, claimNumber);

      senderClient.sendMessage(busMessage);
   }

   @PreDestroy
   public void preDestroy() {
      senderClient.close();
   }

}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Client This issue points to a problem in the data-plane of the library. customer-reported Issues that are reported by GitHub users external to the Azure organization. needs-team-attention This issue needs attention from Azure service team or SDK team question The issue doesn't require a change to the product in order to be resolved. Most issues start as that Service Bus
Projects
None yet
Development

No branches or pull requests

5 participants