Skip to content

Commit

Permalink
Merge pull request #731 from rabbitmq/rabbitmq-java-client-730-consum…
Browse files Browse the repository at this point in the history
…er-work-service-available-processors

Use available processor number for default thread count in consumer work service
  • Loading branch information
michaelklishin committed Mar 30, 2022
2 parents 2720301 + 49c5e2c commit 56cd705
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 4 deletions.
15 changes: 11 additions & 4 deletions src/main/java/com/rabbitmq/client/impl/ConsumerWorkService.java
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
// Copyright (c) 2007-2022 VMware, Inc. or its affiliates. All rights reserved.
//
// This software, the RabbitMQ Java client library, is triple-licensed under the
// Mozilla Public License 2.0 ("MPL"), the GNU General Public License version 2
Expand All @@ -22,19 +22,26 @@
import java.util.concurrent.ThreadFactory;

import com.rabbitmq.client.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

final public class ConsumerWorkService {
private static final Logger LOGGER = LoggerFactory.getLogger(ConsumerWorkService.class);
private static final int MAX_RUNNABLE_BLOCK_SIZE = 16;
private static final int DEFAULT_NUM_THREADS = Runtime.getRuntime().availableProcessors() * 2;
private static final int DEFAULT_NUM_THREADS = Math.max(1, Utils.availableProcessors());
private final ExecutorService executor;
private final boolean privateExecutor;
private final WorkPool<Channel, Runnable> workPool;
private final int shutdownTimeout;

public ConsumerWorkService(ExecutorService executor, ThreadFactory threadFactory, int queueingTimeout, int shutdownTimeout) {
this.privateExecutor = (executor == null);
this.executor = (executor == null) ? Executors.newFixedThreadPool(DEFAULT_NUM_THREADS, threadFactory)
: executor;
if (executor == null) {
LOGGER.debug("Creating executor service with {} thread(s) for consumer work service", DEFAULT_NUM_THREADS);
this.executor = Executors.newFixedThreadPool(DEFAULT_NUM_THREADS, threadFactory);
} else {
this.executor = executor;
}
this.workPool = new WorkPool<>(queueingTimeout);
this.shutdownTimeout = shutdownTimeout;
}
Expand Down
31 changes: 31 additions & 0 deletions src/main/java/com/rabbitmq/client/impl/Utils.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
// Copyright (c) 2022 VMware, Inc. or its affiliates. All rights reserved.
//
// This software, the RabbitMQ Java client library, is triple-licensed under the
// Mozilla Public License 2.0 ("MPL"), the GNU General Public License version 2
// ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see
// LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2. For the ASL,
// please see LICENSE-APACHE2.
//
// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
// either express or implied. See the LICENSE file for specific language governing
// rights and limitations of this software.
//
// If you have any questions regarding licensing, please contact us at
// info@rabbitmq.com.

package com.rabbitmq.client.impl;

final class Utils {

private static final int AVAILABLE_PROCESSORS =
Integer.parseInt(
System.getProperty(
"rabbitmq.amqp.client.availableProcessors",
String.valueOf(Runtime.getRuntime().availableProcessors())));

static int availableProcessors() {
return AVAILABLE_PROCESSORS;
}

private Utils() {}
}

0 comments on commit 56cd705

Please sign in to comment.