Skip to content

Commit

Permalink
[#1711] feat(server): Introduce the reconfigurable conf (#1712)
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

Introduce the reconfigurable conf for server or other componts.

### Why are the changes needed?

Fix: #1711

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Unit tests.
  • Loading branch information
zuston committed May 22, 2024
1 parent 168cf74 commit 5dd4eef
Show file tree
Hide file tree
Showing 4 changed files with 284 additions and 6 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.uniffle.common;

import java.io.File;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

import com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.uniffle.common.config.ConfigOption;
import org.apache.uniffle.common.config.ConfigUtils;
import org.apache.uniffle.common.config.RssBaseConf;
import org.apache.uniffle.common.config.RssConf;
import org.apache.uniffle.common.util.ThreadUtils;

import static org.apache.uniffle.common.config.RssBaseConf.RSS_RECONFIGURE_INTERVAL_SEC;

public class ReconfigurableConfManager<T> {
private static final Logger LOGGER = LoggerFactory.getLogger(ReconfigurableConfManager.class);

private static ReconfigurableConfManager reconfigurableConfManager;

private RssConf rssConf;
private ScheduledExecutorService scheduledThreadPoolExecutor;
private List<ConfigOption<T>> updateConfOptions;

private long latestModificationTimestamp;

private ReconfigurableConfManager(RssConf rssConf, String rssConfFilePath, Class confCls) {
Supplier<RssConf> confSupplier = getConfFromFile(rssConfFilePath, confCls);
initialize(rssConf, confSupplier);
}

private ReconfigurableConfManager(RssConf rssConf, Supplier<RssConf> confSupplier) {
initialize(rssConf, confSupplier);
}

private void initialize(RssConf rssConf, Supplier<RssConf> confSupplier) {
this.rssConf = new RssConf(rssConf);
if (confSupplier != null) {
this.updateConfOptions = new ArrayList<>();
this.scheduledThreadPoolExecutor =
ThreadUtils.getDaemonSingleThreadScheduledExecutor("Refresh-rss-conf");
LOGGER.info("Starting scheduled reconfigurable conf checker...");
scheduledThreadPoolExecutor.scheduleAtFixedRate(
() -> {
try {
RssConf latestConf = confSupplier.get();
update(latestConf);
} catch (Exception e) {
LOGGER.error("Errors on refreshing the rss conf.", e);
}
},
1,
rssConf.get(RSS_RECONFIGURE_INTERVAL_SEC),
TimeUnit.SECONDS);
}
}

private Supplier<RssConf> getConfFromFile(String rssConfFilePath, Class confCls) {
return () -> {
File confFile = new File(rssConfFilePath);
if (confFile.exists() && confFile.isFile()) {
long lastModified = confFile.lastModified();
if (lastModified > latestModificationTimestamp) {
latestModificationTimestamp = lastModified;
RssBaseConf conf = new RssBaseConf();
conf.loadConfFromFile(rssConfFilePath, ConfigUtils.getAllConfigOptions(confCls));
return conf;
}
}
LOGGER.info("Rss conf file:{} is invalid. Ignore updating.", rssConfFilePath);
return null;
};
}

private void update(RssConf latestConf) {
if (latestConf == null) {
return;
}
for (ConfigOption<T> configOption : updateConfOptions) {
T val = latestConf.get(configOption);
if (!Objects.equals(val, rssConf.get(configOption))) {
LOGGER.info(
"Update the config option: {} from {} -> {}",
configOption.key(),
val,
rssConf.get(configOption));
rssConf.set(configOption, val);
}
}
}

private RssConf getConfRef() {
return rssConf;
}

private void registerInternal(ConfigOption<T> configOption) {
this.updateConfOptions.add(configOption);
}

public static void init(RssConf rssConf, String rssConfFilePath) {
ReconfigurableConfManager manager =
new ReconfigurableConfManager(rssConf, rssConfFilePath, rssConf.getClass());
reconfigurableConfManager = manager;
}

@VisibleForTesting
protected static void initForTest(RssConf rssConf, Supplier<RssConf> confSupplier) {
ReconfigurableConfManager manager = new ReconfigurableConfManager(rssConf, confSupplier);
reconfigurableConfManager = manager;
}

public static <T> Reconfigurable<T> register(RssConf conf, ConfigOption<T> configOption) {
if (reconfigurableConfManager == null) {
LOGGER.warn(
"{} is not initialized. The conf of [{}] will not be updated.",
ReconfigurableConfManager.class.getSimpleName(),
configOption.key());
return new FixedReconfigurable<>(conf, configOption);
}

reconfigurableConfManager.registerInternal(configOption);
Reconfigurable<T> reconfigurable =
new Reconfigurable<T>(reconfigurableConfManager, configOption);
return reconfigurable;
}

public static class FixedReconfigurable<T> extends Reconfigurable<T> {
RssConf conf;
ConfigOption<T> option;

FixedReconfigurable(RssConf conf, ConfigOption<T> option) {
this.conf = conf;
this.option = option;
}

@Override
public T get() {
return conf.get(option);
}

@Override
public long getSizeAsBytes() {
return conf.getSizeAsBytes((ConfigOption<Long>) option);
}
}

public static class Reconfigurable<T> {
ReconfigurableConfManager reconfigurableConfManager;
ConfigOption<T> option;

Reconfigurable() {}

Reconfigurable(ReconfigurableConfManager reconfigurableConfManager, ConfigOption<T> option) {
this.reconfigurableConfManager = reconfigurableConfManager;
this.option = option;
}

public T get() {
return reconfigurableConfManager.getConfRef().get(option);
}

public long getSizeAsBytes() {
return reconfigurableConfManager.getConfRef().getSizeAsBytes((ConfigOption<Long>) option);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.uniffle.common;

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;

import org.awaitility.Awaitility;
import org.junit.jupiter.api.Test;

import org.apache.uniffle.common.config.RssConf;

import static org.apache.uniffle.common.config.RssBaseConf.JETTY_HTTP_PORT;
import static org.apache.uniffle.common.config.RssBaseConf.RPC_SERVER_PORT;
import static org.apache.uniffle.common.config.RssBaseConf.RSS_RECONFIGURE_INTERVAL_SEC;
import static org.apache.uniffle.common.config.RssBaseConf.RSS_STORAGE_BASE_PATH;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;

public class ReconfigurableConfManagerTest {

@Test
public void test() throws InterruptedException {
AtomicInteger i = new AtomicInteger(0);
Supplier<RssConf> supplier =
() -> {
if (i.getAndIncrement() <= 1) {
return new RssConf();
}
RssConf conf = new RssConf();
conf.set(JETTY_HTTP_PORT, 100);
conf.set(RPC_SERVER_PORT, 200);
conf.set(RSS_STORAGE_BASE_PATH, Arrays.asList("/d1"));
return conf;
};

RssConf base = new RssConf();
base.set(RSS_RECONFIGURE_INTERVAL_SEC, 1L);
ReconfigurableConfManager.initForTest(base, supplier);

ReconfigurableConfManager.Reconfigurable<Integer> portReconfigurable =
ReconfigurableConfManager.register(base, JETTY_HTTP_PORT);
ReconfigurableConfManager.Reconfigurable<Integer> rpcReconfigurable =
ReconfigurableConfManager.register(base, RPC_SERVER_PORT);
ReconfigurableConfManager.Reconfigurable<List<String>> typeReconfigurable =
ReconfigurableConfManager.register(base, RSS_STORAGE_BASE_PATH);
assertEquals(19998, portReconfigurable.get());
assertEquals(19999, rpcReconfigurable.get());
assertNull(typeReconfigurable.get());

Awaitility.await()
.timeout(5, TimeUnit.SECONDS)
.until(() -> portReconfigurable.get().equals(100));
assertEquals(200, rpcReconfigurable.get());
assertEquals(Arrays.asList("/d1"), typeReconfigurable.get());
}

@Test
public void testWithoutInitialization() {
RssConf base = new RssConf();
base.set(JETTY_HTTP_PORT, 100);
ReconfigurableConfManager.Reconfigurable<Integer> portReconfigurable =
ReconfigurableConfManager.register(base, JETTY_HTTP_PORT);
assertEquals(100, portReconfigurable.get());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import picocli.CommandLine;

import org.apache.uniffle.common.Arguments;
import org.apache.uniffle.common.ReconfigurableConfManager;
import org.apache.uniffle.common.ServerStatus;
import org.apache.uniffle.common.config.RssBaseConf;
import org.apache.uniffle.common.exception.InvalidRequestException;
Expand Down Expand Up @@ -122,6 +123,8 @@ public static void main(String[] args) throws Exception {
LOG.info("Start to init shuffle server using config {}", configFile);

ShuffleServerConf shuffleServerConf = new ShuffleServerConf(configFile);
ReconfigurableConfManager.init(shuffleServerConf, configFile);

final ShuffleServer shuffleServer = new ShuffleServer(shuffleServerConf);
shuffleServer.start();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.uniffle.common.ReconfigurableConfManager;
import org.apache.uniffle.common.ShuffleDataResult;
import org.apache.uniffle.common.ShufflePartitionedData;
import org.apache.uniffle.common.rpc.StatusCode;
Expand Down Expand Up @@ -69,7 +70,7 @@ public class ShuffleBufferManager {
// reduce small I/Os to persistent storage, especially for local HDDs.
private long shuffleFlushThreshold;
// Huge partition vars
private long hugePartitionSizeThreshold;
private ReconfigurableConfManager.Reconfigurable<Long> hugePartitionSizeThresholdRef;
private long hugePartitionMemoryLimitSize;

protected long bufferSize = 0;
Expand Down Expand Up @@ -126,8 +127,8 @@ public ShuffleBufferManager(
conf.getSizeAsBytes(ShuffleServerConf.SINGLE_BUFFER_FLUSH_THRESHOLD);
this.shuffleFlushThreshold =
conf.getSizeAsBytes(ShuffleServerConf.SERVER_SHUFFLE_FLUSH_THRESHOLD);
this.hugePartitionSizeThreshold =
conf.getSizeAsBytes(ShuffleServerConf.HUGE_PARTITION_SIZE_THRESHOLD);
this.hugePartitionSizeThresholdRef =
ReconfigurableConfManager.register(conf, ShuffleServerConf.HUGE_PARTITION_SIZE_THRESHOLD);
this.hugePartitionMemoryLimitSize =
Math.round(
capacity * conf.get(ShuffleServerConf.HUGE_PARTITION_MEMORY_USAGE_LIMITATION_RATIO));
Expand Down Expand Up @@ -702,16 +703,16 @@ public void removeBufferByShuffleId(String appId, Collection<Integer> shuffleIds
boolean isHugePartition(String appId, int shuffleId, int partitionId) {
return shuffleTaskManager != null
&& shuffleTaskManager.getPartitionDataSize(appId, shuffleId, partitionId)
> hugePartitionSizeThreshold;
> hugePartitionSizeThresholdRef.getSizeAsBytes();
}

public boolean isHugePartition(long usedPartitionDataSize) {
return usedPartitionDataSize > hugePartitionSizeThreshold;
return usedPartitionDataSize > hugePartitionSizeThresholdRef.getSizeAsBytes();
}

public boolean limitHugePartition(
String appId, int shuffleId, int partitionId, long usedPartitionDataSize) {
if (usedPartitionDataSize > hugePartitionSizeThreshold) {
if (usedPartitionDataSize > hugePartitionSizeThresholdRef.getSizeAsBytes()) {
long memoryUsed = getShuffleBufferEntry(appId, shuffleId, partitionId).getValue().getSize();
if (memoryUsed > hugePartitionMemoryLimitSize) {
LOG.warn(
Expand Down

0 comments on commit 5dd4eef

Please sign in to comment.