Skip to content

Commit

Permalink
Storages: limit memory usage of data sharing (#8567) (#8574)
Browse files Browse the repository at this point in the history
close #8564
  • Loading branch information
ti-chi-bot committed Dec 25, 2023
1 parent 632eb5b commit 9f7e054
Show file tree
Hide file tree
Showing 8 changed files with 104 additions and 23 deletions.
2 changes: 2 additions & 0 deletions dbms/src/Common/TiFlashMetrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,8 @@ namespace DB
F(type_sche_new_task, {"type", "sche_new_task"}), \
F(type_add_cache_succ, {"type", "add_cache_succ"}), \
F(type_add_cache_stale, {"type", "add_cache_stale"}), \
F(type_add_cache_reach_count_limit, {"type", "type_add_cache_reach_count_limit"}), \
F(type_add_cache_total_bytes_limit, {"type", "add_cache_total_bytes_limit"}), \
F(type_get_cache_miss, {"type", "get_cache_miss"}), \
F(type_get_cache_part, {"type", "get_cache_part"}), \
F(type_get_cache_hit, {"type", "get_cache_hit"}), \
Expand Down
2 changes: 2 additions & 0 deletions dbms/src/Interpreters/Settings.h
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,8 @@ struct Settings
\
M(SettingDouble, dt_page_gc_threshold, 0.5, "Max valid rate of deciding to do a GC in PageStorage") \
M(SettingBool, dt_enable_read_thread, true, "Enable storage read thread or not") \
M(SettingUInt64, dt_max_sharing_column_bytes_for_all, 2048 * Constant::MB, "Memory limitation for data sharing of all requests. 0 means disable data sharing") \
M(SettingUInt64, dt_max_sharing_column_count, 5, "ColumnPtr object limitation for data sharing of each DMFileReader::Stream. 0 means disable data sharing") \
M(SettingBool, dt_enable_bitmap_filter, true, "Use bitmap filter to read data or not") \
M(SettingDouble, dt_read_thread_count_scale, 1.0, "Number of read thread = number of logical cpu cores * dt_read_thread_count_scale. Only has meaning at server startup.") \
M(SettingDouble, dt_filecache_max_downloading_count_scale, 1.0, "Max downloading task count of FileCache = io thread count * dt_filecache_max_downloading_count_scale.") \
Expand Down
20 changes: 18 additions & 2 deletions dbms/src/Storages/DeltaMerge/File/DMFileBlockInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
#include <Storages/DeltaMerge/File/DMFileBlockInputStream.h>
#include <Storages/DeltaMerge/ScanContext.h>

#include <utility>

namespace DB::DM
{
DMFileBlockInputStreamBuilder::DMFileBlockInputStreamBuilder(const Context & context)
Expand Down Expand Up @@ -54,6 +56,20 @@ DMFileBlockInputStreamPtr DMFileBlockInputStreamBuilder::build(const DMFilePtr &

bool enable_read_thread = SegmentReaderPoolManager::instance().isSegmentReader();

if (!enable_read_thread || max_sharing_column_bytes_for_all <= 0)
{
// Disable data sharing.
max_sharing_column_count = 0;
}
else if (
shared_column_data_mem_tracker != nullptr
&& std::cmp_greater_equal(shared_column_data_mem_tracker->get(), max_sharing_column_bytes_for_all))
{
// The memory used reaches the limitation by running queries, disable the data sharing for this DMFile
max_sharing_column_count = 0;
GET_METRIC(tiflash_storage_read_thread_counter, type_add_cache_total_bytes_limit).Increment();
}

DMFileReader reader(
dmfile,
read_columns,
Expand All @@ -73,9 +89,9 @@ DMFileBlockInputStreamPtr DMFileBlockInputStreamBuilder::build(const DMFilePtr &
rows_threshold_per_read,
read_one_pack_every_time,
tracing_id,
enable_read_thread,
max_sharing_column_count,
scan_context);

return std::make_shared<DMFileBlockInputStream>(std::move(reader), enable_read_thread);
return std::make_shared<DMFileBlockInputStream>(std::move(reader), max_sharing_column_count > 0);
}
} // namespace DB::DM
16 changes: 9 additions & 7 deletions dbms/src/Storages/DeltaMerge/File/DMFileBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,19 +33,19 @@ namespace DM
class DMFileBlockInputStream : public SkippableBlockInputStream
{
public:
explicit DMFileBlockInputStream(DMFileReader && reader_, bool enable_read_thread_)
explicit DMFileBlockInputStream(DMFileReader && reader_, bool enable_data_sharing_)
: reader(std::move(reader_))
, enable_read_thread(enable_read_thread_)
, enable_data_sharing(enable_data_sharing_)
{
if (enable_read_thread)
if (enable_data_sharing)
{
DMFileReaderPool::instance().add(reader);
}
}

~DMFileBlockInputStream() override
{
if (enable_read_thread)
if (enable_data_sharing)
{
DMFileReaderPool::instance().del(reader);
}
Expand All @@ -72,7 +72,7 @@ class DMFileBlockInputStream : public SkippableBlockInputStream
private:
#endif
DMFileReader reader;
bool enable_read_thread;
bool enable_data_sharing;
};

using DMFileBlockInputStreamPtr = std::shared_ptr<DMFileBlockInputStream>;
Expand Down Expand Up @@ -161,7 +161,8 @@ class DMFileBlockInputStreamBuilder
enable_column_cache = settings.dt_enable_stable_column_cache;
aio_threshold = settings.min_bytes_to_use_direct_io;
max_read_buffer_size = settings.max_read_buffer_size;
enable_read_thread = settings.dt_enable_read_thread;
max_sharing_column_bytes_for_all = settings.dt_max_sharing_column_bytes_for_all;
max_sharing_column_count = settings.dt_max_sharing_column_count;
return *this;
}
DMFileBlockInputStreamBuilder & setCaches(const MarkCachePtr & mark_cache_, const MinMaxIndexCachePtr & index_cache_)
Expand Down Expand Up @@ -194,7 +195,8 @@ class DMFileBlockInputStreamBuilder
size_t max_read_buffer_size{};
size_t rows_threshold_per_read = DMFILE_READ_ROWS_THRESHOLD;
bool read_one_pack_every_time = false;
bool enable_read_thread = false;
size_t max_sharing_column_bytes_for_all = 0;
size_t max_sharing_column_count = 0;
String tracing_id;
};

Expand Down
6 changes: 3 additions & 3 deletions dbms/src/Storages/DeltaMerge/File/DMFileReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ DMFileReader::DMFileReader(
size_t rows_threshold_per_read_,
bool read_one_pack_every_time_,
const String & tracing_id_,
bool enable_col_sharing_cache,
size_t max_sharing_column_count,
const ScanContextPtr & scan_context_)
: dmfile(dmfile_)
, read_columns(read_columns_)
Expand Down Expand Up @@ -238,9 +238,9 @@ DMFileReader::DMFileReader(
const auto data_type = dmfile->getColumnStat(cd.id).type;
data_type->enumerateStreams(callback, {});
}
if (enable_col_sharing_cache)
if (max_sharing_column_count > 0)
{
col_data_cache = std::make_unique<ColumnSharingCacheMap>(path(), read_columns, log);
col_data_cache = std::make_unique<ColumnSharingCacheMap>(path(), read_columns, max_sharing_column_count, log);
for (const auto & cd : read_columns)
{
last_read_from_cache[cd.id] = false;
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/DeltaMerge/File/DMFileReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ class DMFileReader
size_t rows_threshold_per_read_,
bool read_one_pack_every_time_,
const String & tracing_id_,
bool enable_col_sharing_cache,
size_t max_sharing_column_count,
const ScanContextPtr & scan_context_);

Block getHeader() const { return toEmptyBlock(read_columns); }
Expand Down
19 changes: 15 additions & 4 deletions dbms/src/Storages/DeltaMerge/ReadThread/ColumnSharingCache.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,15 @@ class ColumnSharingCache
ColumnPtr col_data;
};

void add(size_t start_pack_id, size_t pack_count, ColumnPtr & col_data)
void add(size_t start_pack_id, size_t pack_count, ColumnPtr & col_data, size_t max_sharing_column_count)
{
GET_METRIC(tiflash_storage_read_thread_counter, type_add_cache_succ).Increment();
std::lock_guard lock(mtx);
if (packs.size() >= max_sharing_column_count)
{
GET_METRIC(tiflash_storage_read_thread_counter, type_add_cache_reach_count_limit).Increment();
return;
}
GET_METRIC(tiflash_storage_read_thread_counter, type_add_cache_succ).Increment();
auto & value = packs[start_pack_id];
if (value.pack_count < pack_count)
{
Expand Down Expand Up @@ -121,9 +126,14 @@ class ColumnSharingCache
class ColumnSharingCacheMap
{
public:
ColumnSharingCacheMap(const std::string & dmfile_name_, const ColumnDefines & cds, LoggerPtr & log_)
ColumnSharingCacheMap(
const String & dmfile_name_,
const ColumnDefines & cds,
size_t max_sharing_column_count_,
LoggerPtr & log_)
: dmfile_name(dmfile_name_)
, stats(static_cast<int>(ColumnCacheStatus::_TOTAL_COUNT))
, max_sharing_column_count(max_sharing_column_count_)
, log(log_)
{
for (const auto & cd : cds)
Expand Down Expand Up @@ -152,7 +162,7 @@ class ColumnSharingCacheMap
{
return;
}
itr->second.add(start_pack_id, pack_count, col_data);
itr->second.add(start_pack_id, pack_count, col_data, max_sharing_column_count);
}

bool get(int64_t col_id, size_t start_pack_id, size_t pack_count, size_t read_rows, ColumnPtr & col_data, DataTypePtr data_type)
Expand Down Expand Up @@ -207,6 +217,7 @@ class ColumnSharingCacheMap
std::string dmfile_name;
std::unordered_map<int64_t, ColumnSharingCache> cols;
std::vector<std::atomic<int64_t>> stats;
size_t max_sharing_column_count;
LoggerPtr log;
};

Expand Down
60 changes: 54 additions & 6 deletions dbms/src/Storages/DeltaMerge/tests/gtest_column_sharing_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ TEST(ColumnSharingCacheTest, AddAndGet)
ColumnSharingCache cache;

auto col = createColumn(8);
cache.add(1, 8, col);
cache.add(1, 8, col, std::numeric_limits<UInt64>::max());

ColumnPtr col1;
auto st = cache.get(1, 8, 8 * TEST_PACK_ROWS, col1, TEST_DATA_TYPE);
Expand All @@ -75,15 +75,15 @@ TEST(ColumnSharingCacheTest, AddAndGet)
ASSERT_EQ(col4, nullptr);

auto col5 = createColumn(7);
cache.add(1, 7, col5);
cache.add(1, 7, col5, std::numeric_limits<UInt64>::max());
ColumnPtr col6;
st = cache.get(1, 8, 8 * TEST_PACK_ROWS, col6, TEST_DATA_TYPE);
ASSERT_EQ(st, ColumnCacheStatus::GET_HIT);
ASSERT_EQ(col6->size(), 8 * TEST_PACK_ROWS);
compareColumn(col6, col, col6->size());

auto col7 = createColumn(9);
cache.add(1, 9, col7);
cache.add(1, 9, col7, std::numeric_limits<UInt64>::max());
ColumnPtr col8;
st = cache.get(1, 8, 8 * TEST_PACK_ROWS, col8, TEST_DATA_TYPE);
ASSERT_EQ(st, ColumnCacheStatus::GET_COPY);
Expand All @@ -96,13 +96,13 @@ TEST(ColumnSharingCacheTest, Del)
ColumnSharingCache cache;

auto col1 = createColumn(8);
cache.add(1, 8, col1);
cache.add(1, 8, col1, std::numeric_limits<UInt64>::max());

auto col2 = createColumn(8);
cache.add(9, 8, col2);
cache.add(9, 8, col2, std::numeric_limits<UInt64>::max());

auto col3 = createColumn(8);
cache.add(17, 8, col3);
cache.add(17, 8, col3, std::numeric_limits<UInt64>::max());

cache.del(10);

Expand All @@ -119,4 +119,52 @@ TEST(ColumnSharingCacheTest, Del)
compareColumn(col5, col2, col5->size());
}

TEST(ColumnSharingCacheTest, AddAndGetWithLimitation)
{
ColumnSharingCache cache;

auto col = createColumn(8);
// Limit to 0, add should fail.
cache.add(1, 8, col, 0);
ColumnPtr col1;
auto st = cache.get(1, 8, 8 * TEST_PACK_ROWS, col1, TEST_DATA_TYPE);
ASSERT_EQ(st, ColumnCacheStatus::GET_MISS);
ASSERT_EQ(col1, nullptr);

// Limit to 1, add should succ.
cache.add(1, 8, col, 1);
st = cache.get(1, 8, 8 * TEST_PACK_ROWS, col1, TEST_DATA_TYPE);
ASSERT_EQ(st, ColumnCacheStatus::GET_HIT);
ASSERT_EQ(col1->size(), 8 * TEST_PACK_ROWS);
compareColumn(col1, col, col1->size());
ColumnPtr col2;
st = cache.get(1, 7, 7 * TEST_PACK_ROWS, col2, TEST_DATA_TYPE);
ASSERT_EQ(st, ColumnCacheStatus::GET_COPY);
ASSERT_EQ(col2->size(), 7 * TEST_PACK_ROWS);
compareColumn(col2, col, col2->size());
ColumnPtr col3;
st = cache.get(1, 9, 9 * TEST_PACK_ROWS, col3, TEST_DATA_TYPE);
ASSERT_EQ(st, ColumnCacheStatus::GET_PART);
ASSERT_EQ(col3, nullptr);
ColumnPtr col4;
st = cache.get(2, 8, 8 * TEST_PACK_ROWS, col4, TEST_DATA_TYPE);
ASSERT_EQ(st, ColumnCacheStatus::GET_MISS);
ASSERT_EQ(col4, nullptr);

auto col7 = createColumn(9);
// Limit to 1, add should fail.
cache.add(1, 9, col7, 1);
ColumnPtr col8;
st = cache.get(1, 9, 9 * TEST_PACK_ROWS, col8, TEST_DATA_TYPE);
ASSERT_EQ(st, ColumnCacheStatus::GET_PART);
ASSERT_EQ(col8, nullptr);

// Limit to 2, add should succ.
cache.add(1, 9, col7, 2);
st = cache.get(1, 8, 8 * TEST_PACK_ROWS, col8, TEST_DATA_TYPE);
ASSERT_EQ(st, ColumnCacheStatus::GET_COPY);
ASSERT_EQ(col8->size(), 8 * TEST_PACK_ROWS);
compareColumn(col8, col7, col8->size());
}

} // namespace DB::DM::tests

0 comments on commit 9f7e054

Please sign in to comment.