Skip to content

Commit

Permalink
Merge branch 'unstable' into add-ubsan-support
Browse files Browse the repository at this point in the history
  • Loading branch information
mapleFU committed May 7, 2024
2 parents fa22dd0 + d0b13bd commit f46a100
Show file tree
Hide file tree
Showing 26 changed files with 1,405 additions and 161 deletions.
35 changes: 0 additions & 35 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -183,41 +183,6 @@ Documents are hosted at the [official website](https://kvrocks.apache.org/docs/g

Kvrocks community welcomes all forms of contribution and you can find out how to get involved on the [Community](https://kvrocks.apache.org/community/) and [How to Contribute](https://kvrocks.apache.org/community/contributing) pages.

## Performance

### Hardware

* CPU: 48 cores Intel(R) Xeon(R) CPU E5-2650 v4 @ 2.20GHz
* Memory: 32 GiB
* NET: Intel Corporation I350 Gigabit Network Connection
* DISK: 2TB NVMe Intel SSD DC P4600

> Benchmark Client: multi-thread redis-benchmark(unstable branch)
### 1. Commands QPS

> kvrocks: workers = 16, benchmark: 8 threads/ 512 conns / 128 payload
latency: 99.9% < 10ms

![image](assets/chart-commands.png)

### 2. QPS on different payloads

> kvrocks: workers = 16, benchmark: 8 threads/ 512 conns
latency: 99.9% < 10ms

![image](assets/chart-values.png)

### 3. QPS on different workers

> kvrocks: workers = 16, benchmark: 8 threads/ 512 conns / 128 payload
latency: 99.9% < 10ms

![image](assets/chart-threads.png)

## License

Apache Kvrocks is licensed under the Apache License Version 2.0. See the [LICENSE](LICENSE) file for details.
Expand Down
Binary file removed assets/chart-commands.png
Binary file not shown.
Binary file removed assets/chart-threads.png
Binary file not shown.
Binary file removed assets/chart-values.png
Binary file not shown.
2 changes: 1 addition & 1 deletion src/cluster/replication.cc
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ void ReplicationThread::CallbacksStateMachine::ReadWriteCB(bufferevent *bev) {
assert(handler_idx_ <= handlers_.size());
DLOG(INFO) << "[replication] Execute handler[" << getHandlerName(handler_idx_) << "]";
auto st = getHandlerFunc(handler_idx_)(repl_, bev);
repl_->last_io_time_.store(util::GetTimeStamp(), std::memory_order_relaxed);
repl_->last_io_time_secs_.store(util::GetTimeStamp(), std::memory_order_relaxed);
switch (st) {
case CBState::NEXT:
++handler_idx_;
Expand Down
4 changes: 2 additions & 2 deletions src/cluster/replication.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ class ReplicationThread : private EventCallbackBase<ReplicationThread> {
Status Start(std::function<void()> &&pre_fullsync_cb, std::function<void()> &&post_fullsync_cb);
void Stop();
ReplState State() { return repl_state_.load(std::memory_order_relaxed); }
time_t LastIOTime() { return last_io_time_.load(std::memory_order_relaxed); }
int64_t LastIOTimeSecs() const { return last_io_time_secs_.load(std::memory_order_relaxed); }

void TimerCB(int, int16_t);

Expand Down Expand Up @@ -155,7 +155,7 @@ class ReplicationThread : private EventCallbackBase<ReplicationThread> {
Server *srv_ = nullptr;
engine::Storage *storage_ = nullptr;
std::atomic<ReplState> repl_state_;
std::atomic<time_t> last_io_time_ = 0;
std::atomic<int64_t> last_io_time_secs_ = 0;
bool next_try_old_psync_ = false;
bool next_try_without_announce_ip_address_ = false;

Expand Down
118 changes: 117 additions & 1 deletion src/commands/cmd_key.cc
Original file line number Diff line number Diff line change
Expand Up @@ -424,6 +424,120 @@ class CommandCopy : public Commander {
bool replace_ = false;
};

template <bool ReadOnly>
class CommandSort : public Commander {
public:
Status Parse(const std::vector<std::string> &args) override {
CommandParser parser(args, 2);
while (parser.Good()) {
if (parser.EatEqICase("BY")) {
if (!sort_argument_.sortby.empty()) return {Status::InvalidArgument, "don't use multiple BY parameters"};
sort_argument_.sortby = GET_OR_RET(parser.TakeStr());

if (sort_argument_.sortby.find('*') == std::string::npos) {
sort_argument_.dontsort = true;
} else {
/* TODO:
* If BY is specified with a real pattern, we can't accept it in cluster mode,
* unless we can make sure the keys formed by the pattern are in the same slot
* as the key to sort.
* If BY is specified with a real pattern, we can't accept
* it if no full ACL key access is applied for this command. */
}
} else if (parser.EatEqICase("LIMIT")) {
sort_argument_.offset = GET_OR_RET(parser.template TakeInt<int>());
sort_argument_.count = GET_OR_RET(parser.template TakeInt<int>());
} else if (parser.EatEqICase("GET")) {
/* TODO:
* If GET is specified with a real pattern, we can't accept it in cluster mode,
* unless we can make sure the keys formed by the pattern are in the same slot
* as the key to sort. */
sort_argument_.getpatterns.push_back(GET_OR_RET(parser.TakeStr()));
} else if (parser.EatEqICase("ASC")) {
sort_argument_.desc = false;
} else if (parser.EatEqICase("DESC")) {
sort_argument_.desc = true;
} else if (parser.EatEqICase("ALPHA")) {
sort_argument_.alpha = true;
} else if (parser.EatEqICase("STORE")) {
if constexpr (ReadOnly) {
return {Status::RedisParseErr, "SORT_RO is read-only and does not support the STORE parameter"};
}
sort_argument_.storekey = GET_OR_RET(parser.TakeStr());
} else {
return parser.InvalidSyntax();
}
}

return Status::OK();
}

Status Execute(Server *srv, Connection *conn, std::string *output) override {
redis::Database redis(srv->storage, conn->GetNamespace());
RedisType type = kRedisNone;
if (auto s = redis.Type(args_[1], &type); !s.ok()) {
return {Status::RedisExecErr, s.ToString()};
}

if (type != RedisType::kRedisList && type != RedisType::kRedisSet && type != RedisType::kRedisZSet) {
*output = Error("WRONGTYPE Operation against a key holding the wrong kind of value");
return Status::OK();
}

/* When sorting a set with no sort specified, we must sort the output
* so the result is consistent across scripting and replication.
*
* The other types (list, sorted set) will retain their native order
* even if no sort order is requested, so they remain stable across
* scripting and replication.
*
* TODO: support CLIENT_SCRIPT flag, (!storekey_.empty() || c->flags & CLIENT_SCRIPT)) */
if (sort_argument_.dontsort && type == RedisType::kRedisSet && (!sort_argument_.storekey.empty())) {
/* Force ALPHA sorting */
sort_argument_.dontsort = false;
sort_argument_.alpha = true;
sort_argument_.sortby = "";
}

std::vector<std::optional<std::string>> sorted_elems;
Database::SortResult res = Database::SortResult::DONE;

if (auto s = redis.Sort(type, args_[1], sort_argument_, &sorted_elems, &res); !s.ok()) {
return {Status::RedisExecErr, s.ToString()};
}

switch (res) {
case Database::SortResult::UNKNOWN_TYPE:
*output = redis::Error("Unknown Type");
break;
case Database::SortResult::DOUBLE_CONVERT_ERROR:
*output = redis::Error("One or more scores can't be converted into double");
break;
case Database::SortResult::LIMIT_EXCEEDED:
*output = redis::Error("The number of elements to be sorted exceeds SORT_LENGTH_LIMIT = " +
std::to_string(SORT_LENGTH_LIMIT));
break;
case Database::SortResult::DONE:
if (sort_argument_.storekey.empty()) {
std::vector<std::string> output_vec;
output_vec.reserve(sorted_elems.size());
for (const auto &elem : sorted_elems) {
output_vec.emplace_back(elem.has_value() ? redis::BulkString(elem.value()) : conn->NilString());
}
*output = redis::Array(output_vec);
} else {
*output = Integer(sorted_elems.size());
}
break;
}

return Status::OK();
}

private:
SortArgument sort_argument_;
};

REDIS_REGISTER_COMMANDS(MakeCmdAttr<CommandTTL>("ttl", 2, "read-only", 1, 1, 1),
MakeCmdAttr<CommandPTTL>("pttl", 2, "read-only", 1, 1, 1),
MakeCmdAttr<CommandType>("type", 2, "read-only", 1, 1, 1),
Expand All @@ -442,6 +556,8 @@ REDIS_REGISTER_COMMANDS(MakeCmdAttr<CommandTTL>("ttl", 2, "read-only", 1, 1, 1),
MakeCmdAttr<CommandDel>("unlink", -2, "write no-dbsize-check", 1, -1, 1),
MakeCmdAttr<CommandRename>("rename", 3, "write", 1, 2, 1),
MakeCmdAttr<CommandRenameNX>("renamenx", 3, "write", 1, 2, 1),
MakeCmdAttr<CommandCopy>("copy", -3, "write", 1, 2, 1), )
MakeCmdAttr<CommandCopy>("copy", -3, "write", 1, 2, 1),
MakeCmdAttr<CommandSort<false>>("sort", -2, "write", 1, 1, 1),
MakeCmdAttr<CommandSort<true>>("sort_ro", -2, "read-only", 1, 1, 1))

} // namespace redis
8 changes: 4 additions & 4 deletions src/commands/cmd_replication.cc
Original file line number Diff line number Diff line change
Expand Up @@ -242,8 +242,8 @@ class CommandFetchMeta : public Commander {
} else {
LOG(WARNING) << "[replication] Fail to send full data file info " << ip << ", error: " << strerror(errno);
}
auto now = static_cast<time_t>(util::GetTimeStamp());
srv->storage->SetCheckpointAccessTime(now);
auto now_secs = static_cast<time_t>(util::GetTimeStamp());
srv->storage->SetCheckpointAccessTimeSecs(now_secs);
}));

if (auto s = util::ThreadDetach(t); !s) {
Expand Down Expand Up @@ -311,8 +311,8 @@ class CommandFetchFile : public Commander {
usleep(shortest - duration);
}
}
auto now = static_cast<time_t>(util::GetTimeStamp());
srv->storage->SetCheckpointAccessTime(now);
auto now_secs = util::GetTimeStamp<std::chrono::seconds>();
srv->storage->SetCheckpointAccessTimeSecs(now_secs);
srv->DecrFetchFileThread();
}));

Expand Down
6 changes: 3 additions & 3 deletions src/commands/cmd_stream.cc
Original file line number Diff line number Diff line change
Expand Up @@ -588,17 +588,17 @@ class CommandXInfo : public Commander {
}

output->append(redis::MultiLen(result_vector.size()));
auto now = util::GetTimeStampMS();
auto now_ms = util::GetTimeStampMS();
for (auto const &it : result_vector) {
output->append(conn->HeaderOfMap(4));
output->append(redis::BulkString("name"));
output->append(redis::BulkString(it.first));
output->append(redis::BulkString("pending"));
output->append(redis::Integer(it.second.pending_number));
output->append(redis::BulkString("idle"));
output->append(redis::Integer(now - it.second.last_idle));
output->append(redis::Integer(now_ms - it.second.last_idle_ms));
output->append(redis::BulkString("inactive"));
output->append(redis::Integer(now - it.second.last_active));
output->append(redis::Integer(now_ms - it.second.last_active_ms));
}

return Status::OK();
Expand Down
2 changes: 1 addition & 1 deletion src/common/cron.cc
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ Status Cron::SetScheduleTime(const std::vector<std::string> &args) {
return Status::OK();
}

bool Cron::IsTimeMatch(tm *tm) {
bool Cron::IsTimeMatch(const tm *tm) {
if (tm->tm_min == last_tm_.tm_min && tm->tm_hour == last_tm_.tm_hour && tm->tm_mday == last_tm_.tm_mday &&
tm->tm_mon == last_tm_.tm_mon && tm->tm_wday == last_tm_.tm_wday) {
return false;
Expand Down
2 changes: 1 addition & 1 deletion src/common/cron.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class Cron {
~Cron() = default;

Status SetScheduleTime(const std::vector<std::string> &args);
bool IsTimeMatch(tm *tm);
bool IsTimeMatch(const tm *tm);
std::string ToString() const;
bool IsEnabled() const;

Expand Down
1 change: 1 addition & 0 deletions src/common/time_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

namespace util {

/// Get the system timestamp in seconds, milliseconds or microseconds.
template <typename Duration = std::chrono::seconds>
auto GetTimeStamp() {
return std::chrono::duration_cast<Duration>(std::chrono::system_clock::now().time_since_epoch()).count();
Expand Down

0 comments on commit f46a100

Please sign in to comment.