Skip to content

Commit

Permalink
cpp: read messages in deterministic order, yield message offset in Me…
Browse files Browse the repository at this point in the history
…ssageView (#760)

* cpp: time range updates for rosbag2 correctness

* cpp: remove tiebreaker parameter

* cpp: rename messageoffset

* cpp: test record offset inequality

* use end-exclusive time-ranges again

* reader: test chunk read order

* cpp: update version to 0.8.0
  • Loading branch information
james-rms committed Dec 7, 2022
1 parent c0f67af commit 801c4ae
Show file tree
Hide file tree
Showing 14 changed files with 407 additions and 32 deletions.
3 changes: 3 additions & 0 deletions cpp/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ default: build
dev-image:
docker build -t mcap_cpp_dev -f dev.Dockerfile .

dev-shell: dev-image
docker run --rm -v $(CURDIR):/src -it mcap_cpp_dev /bin/bash

.PHONY: build
build: dev-image
docker run -t --rm -v $(CURDIR):/src mcap_cpp_dev
Expand Down
2 changes: 1 addition & 1 deletion cpp/bench/conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
class McapBenchmarksConan(ConanFile):
settings = "os", "compiler", "build_type", "arch"
generators = "cmake"
requires = "benchmark/1.7.0", "mcap/0.7.0"
requires = "benchmark/1.7.0", "mcap/0.8.0"

def build(self):
cmake = CMake(self)
Expand Down
2 changes: 1 addition & 1 deletion cpp/build-docs.sh
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ set -e

conan config init

conan editable add ./mcap mcap/0.7.0
conan editable add ./mcap mcap/0.8.0
conan install docs --install-folder docs/build/Release \
-s compiler.cppstd=17 -s build_type=Release --build missing

Expand Down
2 changes: 1 addition & 1 deletion cpp/build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ set -e

conan config init

conan editable add ./mcap mcap/0.7.0
conan editable add ./mcap mcap/0.8.0
conan install test --install-folder test/build/Debug \
-s compiler.cppstd=17 -s build_type=Debug --build missing

Expand Down
2 changes: 1 addition & 1 deletion cpp/docs/conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
class McapDocsConan(ConanFile):
settings = "os", "compiler", "build_type", "arch"
generators = "cmake"
requires = "mcap/0.7.0"
requires = "mcap/0.8.0"

def build(self):
cmake = CMake(self)
Expand Down
2 changes: 1 addition & 1 deletion cpp/examples/conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ class McapExamplesConan(ConanFile):
settings = "os", "compiler", "build_type", "arch"
generators = "cmake"
requires = [
"mcap/0.7.0",
"mcap/0.8.0",
"protobuf/3.21.1",
"nlohmann_json/3.10.5",
"catch2/2.13.8",
Expand Down
2 changes: 1 addition & 1 deletion cpp/mcap/conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

class McapConan(ConanFile):
name = "mcap"
version = "0.7.0"
version = "0.8.0"
url = "https://github.com/foxglove/mcap"
homepage = "https://github.com/foxglove/mcap"
description = "A C++ implementation of the MCAP file format"
Expand Down
38 changes: 34 additions & 4 deletions cpp/mcap/include/mcap/read_job_queue.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ inline constexpr bool always_false_v = false;
*/
struct ReadMessageJob {
Timestamp timestamp;
ByteOffset offset;
RecordOffset offset;
size_t chunkReaderIndex;
};

Expand Down Expand Up @@ -48,7 +48,7 @@ struct ReadJobQueue {
/**
* @brief return the timestamp key that should be used to compare jobs.
*/
static Timestamp ComparisonKey(const ReadJob& job, bool reverse) {
static Timestamp TimeComparisonKey(const ReadJob& job, bool reverse) {
Timestamp result = 0;
std::visit(
[&](auto&& arg) {
Expand All @@ -68,13 +68,43 @@ struct ReadJobQueue {
job);
return result;
}
static RecordOffset PositionComparisonKey(const ReadJob& job, bool reverse) {
RecordOffset result;
std::visit(
[&](auto&& arg) {
using T = std::decay_t<decltype(arg)>;
if constexpr (std::is_same_v<T, ReadMessageJob>) {
result = arg.offset;
} else if constexpr (std::is_same_v<T, DecompressChunkJob>) {
if (reverse) {
result.offset = arg.messageIndexEndOffset;
} else {
result.offset = arg.chunkStartOffset;
}
} else {
static_assert(always_false_v<T>, "non-exhaustive visitor!");
}
},
job);
return result;
}

static bool CompareForward(const ReadJob& a, const ReadJob& b) {
return ComparisonKey(a, false) > ComparisonKey(b, false);
auto aTimestamp = TimeComparisonKey(a, false);
auto bTimestamp = TimeComparisonKey(b, false);
if (aTimestamp == bTimestamp) {
return PositionComparisonKey(a, false) > PositionComparisonKey(b, false);
}
return aTimestamp > bTimestamp;
}

static bool CompareReverse(const ReadJob& a, const ReadJob& b) {
return ComparisonKey(a, true) < ComparisonKey(b, true);
auto aTimestamp = TimeComparisonKey(a, true);
auto bTimestamp = TimeComparisonKey(b, true);
if (aTimestamp == bTimestamp) {
return PositionComparisonKey(a, true) < PositionComparisonKey(b, true);
}
return aTimestamp < bTimestamp;
}

public:
Expand Down
7 changes: 4 additions & 3 deletions cpp/mcap/include/mcap/reader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -591,7 +591,7 @@ struct MCAP_PUBLIC TypedRecordReader {
struct MCAP_PUBLIC IndexedMessageReader {
public:
IndexedMessageReader(McapReader& reader, const ReadMessageOptions& options,
const std::function<void(const Message&)> onMessage);
const std::function<void(const Message&, RecordOffset)> onMessage);

/**
* @brief reads the next message out of the MCAP.
Expand All @@ -612,6 +612,7 @@ struct MCAP_PUBLIC IndexedMessageReader {
private:
struct ChunkSlot {
ByteArray decompressedChunk;
ByteOffset chunkStartOffset;
int unreadMessages = 0;
};
size_t findFreeChunkSlot();
Expand All @@ -622,7 +623,7 @@ struct MCAP_PUBLIC IndexedMessageReader {
LZ4Reader lz4Reader_;
ReadMessageOptions options_;
std::unordered_set<ChannelId> selectedChannels_;
std::function<void(const Message&)> onMessage_;
std::function<void(const Message&, RecordOffset)> onMessage_;
ReadJobQueue queue_;
std::vector<ChunkSlot> chunkSlots_;
};
Expand Down Expand Up @@ -675,7 +676,7 @@ struct MCAP_PUBLIC LinearMessageView {
std::optional<MessageView> curMessageView_;

private:
void onMessage(const Message& message);
void onMessage(const Message& message, RecordOffset offset);
};

std::unique_ptr<Impl> impl_;
Expand Down
44 changes: 29 additions & 15 deletions cpp/mcap/include/mcap/reader.inl
Original file line number Diff line number Diff line change
Expand Up @@ -1623,12 +1623,17 @@ LinearMessageView::Iterator::Impl::Impl(McapReader& mcapReader, ByteOffset dataS
std::optional<ByteOffset>) {
mcapReader_.channels_.insert_or_assign(channel->id, channel);
};
recordReader_->onMessage =
std::bind(&LinearMessageView::Iterator::Impl::onMessage, this, std::placeholders::_1);
recordReader_->onMessage = [this](const Message& message, ByteOffset messageStartOffset,
std::optional<ByteOffset> chunkStartOffset) {
RecordOffset offset;
offset.chunkOffset = chunkStartOffset;
offset.offset = messageStartOffset;
onMessage(message, offset);
};
} else {
indexedMessageReader_.emplace(
mcapReader, readMessageOptions_,
std::bind(&LinearMessageView::Iterator::Impl::onMessage, this, std::placeholders::_1));
indexedMessageReader_.emplace(mcapReader, readMessageOptions_,
std::bind(&LinearMessageView::Iterator::Impl::onMessage, this,
std::placeholders::_1, std::placeholders::_2));
}

increment();
Expand All @@ -1638,7 +1643,7 @@ LinearMessageView::Iterator::Impl::Impl(McapReader& mcapReader, ByteOffset dataS
* @brief Receives a message from either the linear TypedRecordReader or IndexedMessageReader.
* Sets `curMessageView` with the message along with its associated Channel and Schema.
*/
void LinearMessageView::Iterator::Impl::onMessage(const Message& message) {
void LinearMessageView::Iterator::Impl::onMessage(const Message& message, RecordOffset offset) {
// make sure the message is within the expected time range
if (message.logTime < readMessageOptions_.startTime) {
return;
Expand Down Expand Up @@ -1672,7 +1677,7 @@ void LinearMessageView::Iterator::Impl::onMessage(const Message& message) {
}

curMessage_ = message; // copy message, which may be a reference to a temporary
curMessageView_.emplace(curMessage_, maybeChannel, maybeSchema);
curMessageView_.emplace(curMessage_, maybeChannel, maybeSchema, offset);
}

void LinearMessageView::Iterator::Impl::increment() {
Expand Down Expand Up @@ -1755,8 +1760,9 @@ Status ReadMessageOptions::validate() const {
}

// IndexedMessageReader ///////////////////////////////////////////////////////////
IndexedMessageReader::IndexedMessageReader(McapReader& reader, const ReadMessageOptions& options,
const std::function<void(const Message&)> onMessage)
IndexedMessageReader::IndexedMessageReader(
McapReader& reader, const ReadMessageOptions& options,
const std::function<void(const Message&, RecordOffset)> onMessage)
: mcapReader_(reader)
, recordReader_(*mcapReader_.dataSource(), 0, 0)
, options_(options)
Expand All @@ -1782,10 +1788,16 @@ IndexedMessageReader::IndexedMessageReader(McapReader& reader, const ReadMessage
}
// Initialize the read job queue by finding all of the chunks that need to be read from.
for (const auto& chunkIndex : mcapReader_.chunkIndexes()) {
if (chunkIndex.messageStartTime >= options_.endTime) {
// chunk starts after requested time range, skip it.
continue;
}
if (chunkIndex.messageEndTime < options_.startTime) {
// chunk end before requested time range starts, skip it.
continue;
}
for (const auto& channelId : selectedChannels_) {
if (chunkIndex.messageIndexOffsets.find(channelId) != chunkIndex.messageIndexOffsets.end() &&
chunkIndex.messageStartTime <= options_.endTime &&
chunkIndex.messageEndTime > options_.startTime) {
if (chunkIndex.messageIndexOffsets.find(channelId) != chunkIndex.messageIndexOffsets.end()) {
DecompressChunkJob job;
job.chunkStartOffset = chunkIndex.chunkStartOffset;
job.messageIndexEndOffset =
Expand Down Expand Up @@ -1845,6 +1857,7 @@ bool IndexedMessageReader::next() {
// First, find a chunk slot to decompress this chunk into.
size_t chunkReaderIndex = findFreeChunkSlot();
auto& chunkSlot = chunkSlots_[chunkReaderIndex];
chunkSlot.chunkStartOffset = decompressChunkJob.chunkStartOffset;
// Point the record reader at the chunk and message indices after it.
recordReader_.reset(*mcapReader_.dataSource(), decompressChunkJob.chunkStartOffset,
decompressChunkJob.messageIndexEndOffset);
Expand Down Expand Up @@ -1873,7 +1886,8 @@ bool IndexedMessageReader::next() {
if (timestamp >= options_.startTime && timestamp < options_.endTime) {
ReadMessageJob job;
job.chunkReaderIndex = chunkReaderIndex;
job.offset = byteOffset;
job.offset.offset = byteOffset;
job.offset.chunkOffset = decompressChunkJob.chunkStartOffset;
job.timestamp = timestamp;
queue_.push(std::move(job));
chunkSlot.unreadMessages++;
Expand All @@ -1897,7 +1911,7 @@ bool IndexedMessageReader::next() {
BufferReader reader;
reader.reset(chunkSlot.decompressedChunk.data(), chunkSlot.decompressedChunk.size(),
chunkSlot.decompressedChunk.size());
recordReader_.reset(reader, readMessageJob.offset, chunkSlot.decompressedChunk.size());
recordReader_.reset(reader, readMessageJob.offset.offset, chunkSlot.decompressedChunk.size());
auto record = recordReader_.next();
status_ = recordReader_.status();
if (!status_.ok()) {
Expand All @@ -1914,7 +1928,7 @@ bool IndexedMessageReader::next() {
if (!status_.ok()) {
return false;
}
onMessage_(message);
onMessage_(message, readMessageJob.offset);
return true;
}
}
Expand Down
38 changes: 35 additions & 3 deletions cpp/mcap/include/mcap/types.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,14 @@
#include <functional>
#include <limits>
#include <memory>
#include <optional>
#include <string>
#include <unordered_map>
#include <vector>

namespace mcap {

#define MCAP_LIBRARY_VERSION "0.7.0"
#define MCAP_LIBRARY_VERSION "0.8.0"

using SchemaId = uint16_t;
using ChannelId = uint16_t;
Expand Down Expand Up @@ -349,6 +350,34 @@ struct MCAP_PUBLIC DataEnd {
uint32_t dataSectionCrc;
};

struct MCAP_PUBLIC RecordOffset {
ByteOffset offset;
std::optional<ByteOffset> chunkOffset;

RecordOffset() = default;
explicit RecordOffset(ByteOffset offset_)
: offset(offset_){};
RecordOffset(ByteOffset offset_, ByteOffset chunkOffset_)
: offset(offset_)
, chunkOffset(chunkOffset_){};

bool operator==(const RecordOffset& other) const;
bool operator>(const RecordOffset& other) const;

bool operator!=(const RecordOffset& other) const {
return !(*this == other);
}
bool operator>=(const RecordOffset& other) const {
return ((*this == other) || (*this > other));
}
bool operator<(const RecordOffset& other) const {
return !(*this >= other);
}
bool operator<=(const RecordOffset& other) const {
return !(*this > other);
}
};

/**
* @brief Returned when iterating over Messages in a file, MessageView contains
* a reference to one Message, a pointer to its Channel, and an optional pointer
Expand All @@ -359,11 +388,14 @@ struct MCAP_PUBLIC MessageView {
const Message& message;
const ChannelPtr channel;
const SchemaPtr schema;
const RecordOffset messageOffset;

MessageView(const Message& message, const ChannelPtr channel, const SchemaPtr schema)
MessageView(const Message& message, const ChannelPtr channel, const SchemaPtr schema,
RecordOffset offset)
: message(message)
, channel(channel)
, schema(schema) {}
, schema(schema)
, messageOffset(offset) {}
};

} // namespace mcap
Expand Down
39 changes: 39 additions & 0 deletions cpp/mcap/include/mcap/types.inl
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,43 @@ MetadataIndex::MetadataIndex(const Metadata& metadata, ByteOffset fileOffset)
, length(9 + 4 + metadata.name.size() + 4 + internal::KeyValueMapSize(metadata.metadata))
, name(metadata.name) {}

bool RecordOffset::operator==(const RecordOffset& other) const {
if (chunkOffset != std::nullopt && other.chunkOffset != std::nullopt) {
if (*chunkOffset != *other.chunkOffset) {
// messages are in separate chunks, cannot be equal.
return false;
}
// messages are in the same chunk, compare chunk-level offsets.
return (offset == other.offset);
}
if (chunkOffset != std::nullopt || other.chunkOffset != std::nullopt) {
// one message is in a chunk and one is not, cannot be equal.
return false;
}
// neither message is in a chunk, compare file-level offsets.
return (offset == other.offset);
}

bool RecordOffset::operator>(const RecordOffset& other) const {
if (chunkOffset != std::nullopt) {
if (other.chunkOffset != std::nullopt) {
if (*chunkOffset == *other.chunkOffset) {
// messages are in the same chunk, compare chunk-level offsets.
return (offset > other.offset);
}
// messages are in separate chunks, compare file-level offsets
return (*chunkOffset > *other.chunkOffset);
} else {
// this message is in a chunk, other is not, compare file-level offsets.
return (*chunkOffset > other.offset);
}
}
if (other.chunkOffset != std::nullopt) {
// other messsage is in a chunk, this is not, compare file-level offsets.
return (offset > *other.chunkOffset);
}
// neither message is in a chunk, compare file-level offsets.
return (offset > other.offset);
}

} // namespace mcap
2 changes: 1 addition & 1 deletion cpp/test/conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
class McapTestConan(ConanFile):
settings = "os", "compiler", "build_type", "arch"
generators = "cmake"
requires = "catch2/2.13.8", "mcap/0.7.0", "nlohmann_json/3.10.5"
requires = "catch2/2.13.8", "mcap/0.8.0", "nlohmann_json/3.10.5"

def build(self):
cmake = CMake(self)
Expand Down

0 comments on commit 801c4ae

Please sign in to comment.