Skip to content

Commit

Permalink
Storage: add keyspaceID to DMFile (#8540)
Browse files Browse the repository at this point in the history
ref #8351
  • Loading branch information
Lloyd-Pottiger committed Dec 22, 2023
1 parent 4fc7f2e commit 2000e2f
Show file tree
Hide file tree
Showing 20 changed files with 120 additions and 68 deletions.
3 changes: 2 additions & 1 deletion dbms/src/Server/tests/gtest_dttool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ struct DTToolTest : public DB::base::TiFlashStorageTestBasic
db_context->getSettingsRef());
// Write
{
dmfile = DB::DM::DMFile::create(1, getTemporaryPath(), std::nullopt, 0, 0, DMFileFormat::V0);
dmfile = DB::DM::DMFile::create(1, getTemporaryPath(), std::nullopt, 0, 0, NullspaceID, DMFileFormat::V0);
{
auto stream = DB::DM::DMFileBlockOutputStream(*db_context, dmfile, *defines);
stream.writePrefix();
Expand All @@ -116,6 +116,7 @@ struct DTToolTest : public DB::base::TiFlashStorageTestBasic
std::make_optional<DMChecksumConfig>(),
128 * 1024,
16 * 1024 * 1024,
NullspaceID,
DMFileFormat::V3);
{
auto stream = DB::DM::DMFileBlockOutputStream(*db_context, dmfileV3, *defines);
Expand Down
3 changes: 2 additions & 1 deletion dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileBig.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,8 @@ ColumnFilePersistedPtr ColumnFileBig::deserializeMetadata(
file_id,
file_page_id,
file_parent_path,
DMFile::ReadMetaMode::all());
DMFile::ReadMetaMode::all(),
dm_context.keyspace_id);
auto res = path_delegate.updateDTFileSize(file_id, dmfile->getBytesOnDisk());
RUNTIME_CHECK_MSG(res, "update dt file size failed, path={}", dmfile->path());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,8 @@ bool SSTFilesToDTFilesOutputStream<ChildStream>::newDTFileStream()
parent_path,
storage->createChecksumConfig(),
context.getGlobalContext().getSettingsRef().dt_small_file_size_threshold,
context.getGlobalContext().getSettingsRef().dt_merged_file_max_size);
context.getGlobalContext().getSettingsRef().dt_merged_file_max_size,
storage->getKeyspaceID());
dt_stream = std::make_unique<DMFileBlockOutputStream>(context, dt_file, *(schema_snap->column_defines));
dt_stream->writePrefix();
ingest_files.emplace_back(dt_file);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
#include <Storages/Page/PageDefinesBase.h>

#include <memory>
#include <string_view>

namespace DB
{
Expand Down Expand Up @@ -168,7 +167,7 @@ class MockSSTFilesToDTFilesOutputStreamChild : private boost::noncopyable

static SSTFilesToBlockInputStream::ProcessKeys getProcessKeys() { return {}; }

size_t getSplitId() const { return DM::SSTScanSoftLimit::HEAD_OR_ONLY_SPLIT; }
static size_t getSplitId() { return DM::SSTScanSoftLimit::HEAD_OR_ONLY_SPLIT; }

protected:
BlockInputStreamPtr mock_data;
Expand Down
3 changes: 2 additions & 1 deletion dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,8 @@ std::vector<ColumnFilePtrT> CloneColumnFilesHelper<ColumnFilePtrT>::clone(
file_id,
/* page_id= */ new_page_id,
file_parent_path,
DMFile::ReadMetaMode::all());
DMFile::ReadMetaMode::all(),
dm_context.keyspace_id);

auto new_column_file = f->cloneWith(dm_context, new_file, target_range);
cloned.push_back(new_column_file);
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1147,7 +1147,7 @@ BlockInputStreams DeltaMergeStore::read(
UInt64 max_version,
const PushDownFilterPtr & filter,
const RuntimeFilteList & runtime_filter_list,
const int rf_max_wait_time_ms,
int rf_max_wait_time_ms,
const String & tracing_id,
bool keep_order,
bool is_fast_scan,
Expand Down Expand Up @@ -1251,7 +1251,7 @@ void DeltaMergeStore::read(
UInt64 max_version,
const PushDownFilterPtr & filter,
const RuntimeFilteList & runtime_filter_list,
const int rf_max_wait_time_ms,
int rf_max_wait_time_ms,
const String & tracing_id,
bool keep_order,
bool is_fast_scan,
Expand Down
5 changes: 3 additions & 2 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -408,7 +408,7 @@ class DeltaMergeStore : private boost::noncopyable
UInt64 max_version,
const PushDownFilterPtr & filter,
const RuntimeFilteList & runtime_filter_list,
const int rf_max_wait_time_ms,
int rf_max_wait_time_ms,
const String & tracing_id,
bool keep_order,
bool is_fast_scan = false,
Expand All @@ -433,7 +433,7 @@ class DeltaMergeStore : private boost::noncopyable
UInt64 max_version,
const PushDownFilterPtr & filter,
const RuntimeFilteList & runtime_filter_list,
const int rf_max_wait_time_ms,
int rf_max_wait_time_ms,
const String & tracing_id,
bool keep_order,
bool is_fast_scan = false,
Expand Down Expand Up @@ -516,6 +516,7 @@ class DeltaMergeStore : private boost::noncopyable
const Settings & getSettings() const { return settings; }
DataTypePtr getPKDataType() const { return original_table_handle_define.type; }
SortDescription getPrimarySortDescription() const;
KeyspaceID getKeyspaceID() const { return keyspace_id; }

void check(const Context & db_context);

Expand Down
18 changes: 13 additions & 5 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore_Ingest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,8 @@ void DeltaMergeStore::cleanPreIngestFiles(
f.id,
f.id,
file_parent_path,
DM::DMFile::ReadMetaMode::memoryAndDiskSize());
DM::DMFile::ReadMetaMode::memoryAndDiskSize(),
keyspace_id);
removePreIngestFile(f.id, false);
file->remove(file_provider);
}
Expand Down Expand Up @@ -181,8 +182,13 @@ Segments DeltaMergeStore::ingestDTFilesUsingColumnFile(
const auto & file_parent_path = file->parentPath();
auto page_id = storage_pool->newDataPageIdForDTFile(delegate, __PRETTY_FUNCTION__);

auto ref_file
= DMFile::restore(file_provider, file_id, page_id, file_parent_path, DMFile::ReadMetaMode::all());
auto ref_file = DMFile::restore(
file_provider,
file_id,
page_id,
file_parent_path,
DMFile::ReadMetaMode::all(),
keyspace_id);
data_files.emplace_back(std::move(ref_file));
wbs.data.putRefPage(page_id, file->pageId());
}
Expand Down Expand Up @@ -464,7 +470,8 @@ bool DeltaMergeStore::ingestDTFileIntoSegmentUsingSplit(
file->fileId(),
new_page_id,
file->parentPath(),
DMFile::ReadMetaMode::all());
DMFile::ReadMetaMode::all(),
keyspace_id);
wbs.data.putRefPage(new_page_id, file->pageId());

// We have to commit those file_ids to PageStorage before applying the ingest, because after the write
Expand Down Expand Up @@ -653,7 +660,8 @@ UInt64 DeltaMergeStore::ingestFiles(
external_file.id,
external_file.id,
file_parent_path,
DMFile::ReadMetaMode::memoryAndDiskSize());
DMFile::ReadMetaMode::memoryAndDiskSize(),
keyspace_id);
}
else
{
Expand Down
8 changes: 7 additions & 1 deletion dbms/src/Storages/DeltaMerge/DeltaMergeStore_InternalBg.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,13 @@ class LocalDMFileGcRemover final
continue;

// Note that page_id is useless here.
auto dmfile = DMFile::restore(file_provider, id, /* page_id= */ 0, path, DMFile::ReadMetaMode::none());
auto dmfile = DMFile::restore(
file_provider,
id,
/* page_id= */ 0,
path,
DMFile::ReadMetaMode::none(),
path_pool->getKeyspaceID());
if (unlikely(!dmfile))
{
// If the dtfile directory is not exist, it means `StoragePathPool::drop` have been
Expand Down
22 changes: 12 additions & 10 deletions dbms/src/Storages/DeltaMerge/File/DMFile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
#include <common/logger_useful.h>
#include <fmt/format.h>

#include <atomic>
#include <boost/algorithm/string/classification.hpp>
#include <filesystem>
#include <utility>
Expand Down Expand Up @@ -121,6 +120,7 @@ DMFilePtr DMFile::create(
DMConfigurationOpt configuration,
UInt64 small_file_size_threshold,
UInt64 merged_file_max_size,
KeyspaceID keyspace_id,
DMFileFormat::Version version)
{
// if small_file_size_threshold == 0 we should use DMFileFormat::V2
Expand All @@ -142,7 +142,8 @@ DMFilePtr DMFile::create(
small_file_size_threshold,
merged_file_max_size,
std::move(configuration),
version));
version,
keyspace_id));

auto path = new_dmfile->path();
Poco::File file(path);
Expand All @@ -167,7 +168,8 @@ DMFilePtr DMFile::restore(
UInt64 file_id,
UInt64 page_id,
const String & parent_path,
const ReadMetaMode & read_meta_mode)
const ReadMetaMode & read_meta_mode,
KeyspaceID keyspace_id)
{
auto is_s3_file = S3::S3FilenameView::fromKeyWithPrefix(parent_path).isDataFile();
if (!is_s3_file)
Expand All @@ -181,7 +183,7 @@ DMFilePtr DMFile::restore(
return nullptr;
}

DMFilePtr dmfile(new DMFile(file_id, page_id, parent_path, Status::READABLE));
DMFilePtr dmfile(new DMFile(file_id, page_id, parent_path, Status::READABLE, keyspace_id));
if (is_s3_file || Poco::File(dmfile->metav2Path()).exists())
{
auto s = dmfile->readMetaV2(file_provider);
Expand Down Expand Up @@ -321,7 +323,7 @@ String DMFile::colMarkFileName(const FileNameBase & file_name_base)
return file_name_base + details::MARK_FILE_SUFFIX;
}

DMFile::OffsetAndSize DMFile::writeMetaToBuffer(WriteBuffer & buffer)
DMFile::OffsetAndSize DMFile::writeMetaToBuffer(WriteBuffer & buffer) const
{
size_t meta_offset = buffer.count();
writeString("DTFile format: ", buffer);
Expand Down Expand Up @@ -646,7 +648,7 @@ void DMFile::finalizeForFolderMode(const FileProviderPtr & file_provider, const
}
writeMetadata(file_provider, write_limiter);
if (unlikely(status != Status::WRITING))
throw Exception("Expected WRITING status, now " + statusString(status));
throw Exception(ErrorCodes::LOGICAL_ERROR, "Expected WRITING status, now {}", magic_enum::enum_name(status));
Poco::File old_file(path());
setStatus(Status::READABLE);

Expand Down Expand Up @@ -849,7 +851,7 @@ DMFile::MetaBlockHandle DMFile::writeSLPackStatToBuffer(WriteBuffer & buffer)
return MetaBlockHandle{MetaBlockType::PackStat, offset, buffer.count() - offset};
}

DMFile::MetaBlockHandle DMFile::writeSLPackPropertyToBuffer(WriteBuffer & buffer)
DMFile::MetaBlockHandle DMFile::writeSLPackPropertyToBuffer(WriteBuffer & buffer) const
{
auto offset = buffer.count();
for (const auto & pb : pack_properties.property())
Expand Down Expand Up @@ -923,7 +925,7 @@ void DMFile::finalizeMetaV2(WriteBuffer & buffer)
writePODBinary(footer, buffer);
}

std::vector<char> DMFile::readMetaV2(const FileProviderPtr & file_provider)
std::vector<char> DMFile::readMetaV2(const FileProviderPtr & file_provider) const
{
auto rbuf = openForRead(file_provider, metav2Path(), encryptionMetav2Path(), meta_buffer_size);
std::vector<char> buf(meta_buffer_size);
Expand Down Expand Up @@ -1065,7 +1067,7 @@ void DMFile::finalizeDirName()
status == Status::WRITING,
"FileId={} Expected WRITING status, but {}",
file_id,
statusString(status));
magic_enum::enum_name(status));
Poco::File old_file(path());
setStatus(Status::READABLE);
auto new_path = path();
Expand All @@ -1083,7 +1085,7 @@ void DMFile::finalizeDirName()
old_file.renameTo(new_path);
}

std::vector<String> DMFile::listFilesForUpload()
std::vector<String> DMFile::listFilesForUpload() const
{
RUNTIME_CHECK(useMetaV2());
std::vector<String> fnames;
Expand Down
36 changes: 13 additions & 23 deletions dbms/src/Storages/DeltaMerge/File/DMFile.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include <Storages/S3/S3Filename.h>
#include <Storages/S3/S3RandomAccessFile.h>
#include <common/logger_useful.h>

namespace DB::DM
{
class DMFile;
Expand Down Expand Up @@ -58,23 +59,6 @@ class DMFile : private boost::noncopyable
DROPPED,
};

static String statusString(Status status)
{
switch (status)
{
case WRITABLE:
return "WRITABLE";
case WRITING:
return "WRITING";
case READABLE:
return "READABLE";
case DROPPED:
return "DROPPED";
default:
throw Exception("Unexpected status: " + DB::toString(static_cast<int>(status)));
}
}

struct ReadMetaMode
{
private:
Expand Down Expand Up @@ -219,14 +203,16 @@ class DMFile : private boost::noncopyable
DMConfigurationOpt configuration = std::nullopt,
UInt64 small_file_size_threshold = 128 * 1024,
UInt64 merged_file_max_size = 16 * 1024 * 1024,
KeyspaceID keyspace_id = NullspaceID,
DMFileFormat::Version = STORAGE_FORMAT_CURRENT.dm_file);

static DMFilePtr restore(
const FileProviderPtr & file_provider,
UInt64 file_id,
UInt64 page_id,
const String & parent_path,
const ReadMetaMode & read_meta_mode);
const ReadMetaMode & read_meta_mode,
KeyspaceID keyspace_id = NullspaceID);

struct ListOptions
{
Expand Down Expand Up @@ -329,7 +315,7 @@ class DMFile : private boost::noncopyable
}

static String metav2FileName() { return "meta"; }
std::vector<String> listFilesForUpload();
std::vector<String> listFilesForUpload() const;
void switchToRemote(const S3::DMFileOID & oid);

#ifndef DBMS_PUBLIC_GTEST
Expand All @@ -345,9 +331,11 @@ class DMFile : private boost::noncopyable
UInt64 small_file_size_threshold_ = 128 * 1024,
UInt64 merged_file_max_size_ = 16 * 1024 * 1024,
DMConfigurationOpt configuration_ = std::nullopt,
DMFileFormat::Version version_ = STORAGE_FORMAT_CURRENT.dm_file)
DMFileFormat::Version version_ = STORAGE_FORMAT_CURRENT.dm_file,
KeyspaceID keyspace_id_ = NullspaceID)
: file_id(file_id_)
, page_id(page_id_)
, keyspace_id(keyspace_id_)
, parent_path(std::move(parent_path_))
, status(status_)
, configuration(std::move(configuration_))
Expand Down Expand Up @@ -423,7 +411,7 @@ class DMFile : private boost::noncopyable
static String colMarkFileName(const FileNameBase & file_name_base);

using OffsetAndSize = std::tuple<size_t, size_t>;
OffsetAndSize writeMetaToBuffer(WriteBuffer & buffer);
OffsetAndSize writeMetaToBuffer(WriteBuffer & buffer) const;
OffsetAndSize writePackStatToBuffer(WriteBuffer & buffer);
OffsetAndSize writePackPropertyToBuffer(WriteBuffer & buffer, UnifiedDigestBase * digest = nullptr);

Expand Down Expand Up @@ -464,10 +452,10 @@ class DMFile : private boost::noncopyable
static constexpr size_t meta_buffer_size = 64 * 1024;
void finalizeMetaV2(WriteBuffer & buffer);
MetaBlockHandle writeSLPackStatToBuffer(WriteBuffer & buffer);
MetaBlockHandle writeSLPackPropertyToBuffer(WriteBuffer & buffer);
MetaBlockHandle writeSLPackPropertyToBuffer(WriteBuffer & buffer) const;
MetaBlockHandle writeColumnStatToBuffer(WriteBuffer & buffer);
MetaBlockHandle writeMergedSubFilePosotionsToBuffer(WriteBuffer & buffer);
std::vector<char> readMetaV2(const FileProviderPtr & file_provider);
std::vector<char> readMetaV2(const FileProviderPtr & file_provider) const;
void parseMetaV2(std::string_view buffer);
void parseColumnStat(std::string_view buffer);
void parseMergedSubFilePos(std::string_view buffer);
Expand All @@ -484,6 +472,8 @@ class DMFile : private boost::noncopyable
UInt64 file_id;
// It is the page_id that represent this file in the PageStorage. It could be the same as file id.
UInt64 page_id;
// The id of the keyspace that this file belongs to.
KeyspaceID keyspace_id;
String parent_path;

PackStats pack_stats;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ DMFileBlockInputStreamPtr DMFileBlockInputStreamBuilder::build(
RUNTIME_CHECK(
dmfile->getStatus() == DMFile::Status::READABLE,
dmfile->fileId(),
DMFile::statusString(dmfile->getStatus()));
magic_enum::enum_name(dmfile->getStatus()));

// if `rowkey_ranges` is empty, we unconditionally read all packs
// `rowkey_ranges` and `is_common_handle` will only be useful in clean read mode.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,7 @@ DMFilePtr S3PreparedDMFileToken::restore(DMFile::ReadMetaMode read_mode)
oid.file_id,
page_id,
S3::S3Filename::fromTableID(oid.store_id, oid.keyspace_id, oid.table_id).toFullKeyWithPrefix(),
read_mode);
read_mode,
oid.keyspace_id);
}
} // namespace DB::DM::Remote

0 comments on commit 2000e2f

Please sign in to comment.