Skip to content

Commit

Permalink
make tiflash scale-in faster (#8432)
Browse files Browse the repository at this point in the history
close #8433
  • Loading branch information
guo-shaoge committed Nov 30, 2023
1 parent 83c2256 commit e5eaf31
Show file tree
Hide file tree
Showing 6 changed files with 121 additions and 95 deletions.
7 changes: 3 additions & 4 deletions dbms/src/Common/Config/ConfigReloader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ ConfigReloader::~ConfigReloader()
{
quit = true;

cv.notify_all();
if (thread.joinable())
thread.join();
}
Expand All @@ -62,18 +63,16 @@ void ConfigReloader::run()

while (true)
{
if (quit)
std::unique_lock lock(reload_mutex);
if (cv.wait_for(lock, reload_interval, [this]() { return quit.load(); }))
return;

std::this_thread::sleep_for(reload_interval);
reloadIfNewer(false, /* throw_on_error = */ false);
}
}

void ConfigReloader::reloadIfNewer(bool force, bool throw_on_error)
{
std::lock_guard lock(reload_mutex);

FilesChangesTracker new_files = getNewFileList();
bool config_object_updated = false;
for (const auto & conf : config_objects)
Expand Down
2 changes: 2 additions & 0 deletions dbms/src/Common/Config/ConfigReloader.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ class ConfigReloader

std::atomic_bool quit{false};
std::thread thread;

std::condition_variable cv;
};

} // namespace DB
3 changes: 2 additions & 1 deletion dbms/src/Flash/ResourceControl/LocalAdmissionController.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ namespace DB
{
void LocalAdmissionController::warmupResourceGroupInfoCache(const std::string & name)
{
assert(!stopped);
if (unlikely(stopped))
return;

if (name.empty())
return;
Expand Down
92 changes: 53 additions & 39 deletions dbms/src/Flash/ResourceControl/LocalAdmissionController.h
Original file line number Diff line number Diff line change
Expand Up @@ -458,6 +458,9 @@ class LocalAdmissionController final : private boost::noncopyable

uint64_t estWaitDuraMS(const std::string & name) const
{
if (unlikely(stopped))
return 0;

if (name.empty())
return 0;

Expand All @@ -472,7 +475,8 @@ class LocalAdmissionController final : private boost::noncopyable

std::optional<uint64_t> getPriority(const std::string & name)
{
assert(!stopped);
if (unlikely(stopped))
return {HIGHEST_RESOURCE_GROUP_PRIORITY};

if (name.empty())
return {HIGHEST_RESOURCE_GROUP_PRIORITY};
Expand All @@ -495,7 +499,9 @@ class LocalAdmissionController final : private boost::noncopyable

void registerRefillTokenCallback(const std::function<void()> & cb)
{
assert(!stopped);
if (unlikely(stopped))
return;

// NOTE: Better not use lock inside refill_token_callback,
// because LAC needs to lock when calling refill_token_callback,
// which may introduce dead lock.
Expand All @@ -506,53 +512,22 @@ class LocalAdmissionController final : private boost::noncopyable

void unregisterRefillTokenCallback()
{
assert(!stopped);
if (unlikely(stopped))
return;

std::lock_guard lock(mu);
RUNTIME_CHECK_MSG(refill_token_callback != nullptr, "callback cannot be nullptr before unregistering");
refill_token_callback = nullptr;
}

#ifdef DBMS_PUBLIC_GTEST
static std::unique_ptr<MockLocalAdmissionController> global_instance;
#else
static std::unique_ptr<LocalAdmissionController> global_instance;
#endif

// Interval of fetch from GAC periodically.
static constexpr auto DEFAULT_FETCH_GAC_INTERVAL = std::chrono::seconds(5);
static constexpr auto DEFAULT_FETCH_GAC_INTERVAL_MS = 5000;

private:
void consumeResource(const std::string & name, double ru, uint64_t cpu_time_in_ns)
void stop()
{
assert(!stopped);

// When tidb_enable_resource_control is disabled, resource group name is empty.
if (name.empty())
return;

ResourceGroupPtr group = findResourceGroup(name);
if unlikely (!group)
if (stopped)
{
LOG_INFO(log, "cannot consume ru for {}, maybe it has been deleted", name);
LOG_DEBUG(log, "LAC already stopped");
return;
}

group->consumeResource(ru, cpu_time_in_ns);
if (group->lowToken() || group->trickleModeLeaseExpire(SteadyClock::now()))
{
{
std::lock_guard lock(mu);
low_token_resource_groups.insert(name);
}
cv.notify_one();
}
}

void stop()
{
if (stopped)
return;
stopped.store(true);

// TryCancel() is thread safe(https://github.com/grpc/grpc/pull/30416).
Expand Down Expand Up @@ -602,6 +577,45 @@ class LocalAdmissionController final : private boost::noncopyable
getCurrentExceptionMessage(false));
}
}
LOG_INFO(log, "LAC stopped done: final report size: {}", acquire_infos.size());
}

#ifdef DBMS_PUBLIC_GTEST
static std::unique_ptr<MockLocalAdmissionController> global_instance;
#else
static std::unique_ptr<LocalAdmissionController> global_instance;
#endif

// Interval of fetch from GAC periodically.
static constexpr auto DEFAULT_FETCH_GAC_INTERVAL = std::chrono::seconds(5);
static constexpr auto DEFAULT_FETCH_GAC_INTERVAL_MS = 5000;

private:
void consumeResource(const std::string & name, double ru, uint64_t cpu_time_in_ns)
{
if (unlikely(stopped))
return;

// When tidb_enable_resource_control is disabled, resource group name is empty.
if (name.empty())
return;

ResourceGroupPtr group = findResourceGroup(name);
if unlikely (!group)
{
LOG_INFO(log, "cannot consume ru for {}, maybe it has been deleted", name);
return;
}

group->consumeResource(ru, cpu_time_in_ns);
if (group->lowToken() || group->trickleModeLeaseExpire(SteadyClock::now()))
{
{
std::lock_guard lock(mu);
low_token_resource_groups.insert(name);
}
cv.notify_one();
}
}

// If we cannot get GAC resp for DEGRADE_MODE_DURATION seconds, enter degrade mode.
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Server/MetricsPrometheus.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ MetricsPrometheus::MetricsPrometheus(Context & context, const AsynchronousMetric

MetricsPrometheus::~MetricsPrometheus()
{
timer.cancel(true);
timer.cancel(false);
}

void MetricsPrometheus::run()
Expand Down
110 changes: 60 additions & 50 deletions dbms/src/Server/Server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1562,53 +1562,6 @@ int Server::main(const std::vector<std::string> & /*args*/)
}
});

auto & tmt_context = global_context->getTMTContext();

// For test mode, TaskScheduler and LAC is controlled by test case.
// TODO: resource control is not supported for WN. So disable pipeline model and LAC.
const bool init_pipeline_and_lac
= !global_context->isTest() && !global_context->getSharedContextDisagg()->isDisaggregatedStorageMode();
if (init_pipeline_and_lac)
{
#ifdef DBMS_PUBLIC_GTEST
LocalAdmissionController::global_instance = std::make_unique<MockLocalAdmissionController>();
#else
LocalAdmissionController::global_instance
= std::make_unique<LocalAdmissionController>(tmt_context.getKVCluster(), tmt_context.getEtcdClient());
#endif

auto get_pool_size = [](const auto & setting) {
return setting == 0 ? getNumberOfLogicalCPUCores() : static_cast<size_t>(setting);
};
TaskSchedulerConfig config{
{get_pool_size(settings.pipeline_cpu_task_thread_pool_size),
settings.pipeline_cpu_task_thread_pool_queue_type},
{get_pool_size(settings.pipeline_io_task_thread_pool_size),
settings.pipeline_io_task_thread_pool_queue_type},
};
RUNTIME_CHECK(!TaskScheduler::instance);
TaskScheduler::instance = std::make_unique<TaskScheduler>(config);
LOG_INFO(log, "init pipeline task scheduler with {}", config.toString());
}

SCOPE_EXIT({
if (init_pipeline_and_lac)
{
assert(TaskScheduler::instance);
TaskScheduler::instance.reset();
assert(LocalAdmissionController::global_instance);
LocalAdmissionController::global_instance.reset();
}
});

if (settings.enable_async_grpc_client)
{
auto size = settings.grpc_completion_queue_pool_size;
if (size == 0)
size = std::thread::hardware_concurrency();
GRPCCompletionQueuePool::global_instance = std::make_unique<GRPCCompletionQueuePool>(size);
}

// FIXME: (bootstrap) we should bootstrap the tiflash node more early!
if (global_context->getSharedContextDisagg()->notDisaggregatedMode()
|| /*has_been_bootstrap*/ store_ident.has_value())
Expand All @@ -1629,9 +1582,6 @@ int Server::main(const std::vector<std::string> & /*args*/)
wn_ps->waitUntilInitedFromRemoteStore();
}

/// Then, startup grpc server to serve raft and/or flash services.
FlashGrpcServerHolder flash_grpc_server_holder(this->context(), this->config(), raft_config, log);

{
TcpHttpServersHolder tcpHttpServersHolder(*this, settings, log);

Expand Down Expand Up @@ -1678,6 +1628,7 @@ int Server::main(const std::vector<std::string> & /*args*/)
auto metrics_prometheus = std::make_unique<MetricsPrometheus>(*global_context, async_metrics);

SessionCleaner session_cleaner(*global_context);
auto & tmt_context = global_context->getTMTContext();

if (proxy_conf.is_proxy_runnable)
{
Expand Down Expand Up @@ -1792,6 +1743,65 @@ int Server::main(const std::vector<std::string> & /*args*/)
GET_METRIC(tiflash_server_info, start_time).Set(ts.epochTime());
}

// For test mode, TaskScheduler and LAC is controlled by test case.
// TODO: resource control is not supported for WN. So disable pipeline model and LAC.
const bool init_pipeline_and_lac
= !global_context->isTest() && !global_context->getSharedContextDisagg()->isDisaggregatedStorageMode();
if (init_pipeline_and_lac)
{
#ifdef DBMS_PUBLIC_GTEST
LocalAdmissionController::global_instance = std::make_unique<MockLocalAdmissionController>();
#else
LocalAdmissionController::global_instance
= std::make_unique<LocalAdmissionController>(tmt_context.getKVCluster(), tmt_context.getEtcdClient());
#endif

auto get_pool_size = [](const auto & setting) {
return setting == 0 ? getNumberOfLogicalCPUCores() : static_cast<size_t>(setting);
};
TaskSchedulerConfig config{
{get_pool_size(settings.pipeline_cpu_task_thread_pool_size),
settings.pipeline_cpu_task_thread_pool_queue_type},
{get_pool_size(settings.pipeline_io_task_thread_pool_size),
settings.pipeline_io_task_thread_pool_queue_type},
};
RUNTIME_CHECK(!TaskScheduler::instance);
TaskScheduler::instance = std::make_unique<TaskScheduler>(config);
LOG_INFO(log, "init pipeline task scheduler with {}", config.toString());
}

SCOPE_EXIT({
if (init_pipeline_and_lac)
{
assert(TaskScheduler::instance);
TaskScheduler::instance.reset();
assert(LocalAdmissionController::global_instance);
LocalAdmissionController::global_instance.reset();
}
});

if (settings.enable_async_grpc_client)
{
auto size = settings.grpc_completion_queue_pool_size;
if (size == 0)
size = std::thread::hardware_concurrency();
GRPCCompletionQueuePool::global_instance = std::make_unique<GRPCCompletionQueuePool>(size);
}

/// startup grpc server to serve raft and/or flash services.
FlashGrpcServerHolder flash_grpc_server_holder(this->context(), this->config(), raft_config, log);

SCOPE_EXIT({
// Stop LAC for AutoScaler managed CN before FlashGrpcServerHolder is destructed.
// Because AutoScaler it will kill tiflash process when port of flash_server_addr is down.
// And we want to make sure LAC is cleanedup.
// The effects are there will be no resource control during [lac.stop(), FlashGrpcServer destruct done],
// but it's basically ok, that duration is small(normally 100-200ms).
if (global_context->getSharedContextDisagg()->isDisaggregatedComputeMode() && use_autoscaler
&& LocalAdmissionController::global_instance)
LocalAdmissionController::global_instance->stop();
});

tmt_context.setStatusRunning();

try
Expand Down

0 comments on commit e5eaf31

Please sign in to comment.