Skip to content

Commit

Permalink
feat: add cluster mode support for set priority
Browse files Browse the repository at this point in the history
Signed-off-by: Liuqing Yue <dqhl76@gmail.com>

save

Signed-off-by: Liuqing Yue <dqhl76@gmail.com>

fixup

Signed-off-by: Liuqing Yue <dqhl76@gmail.com>
  • Loading branch information
dqhl76 committed May 7, 2024
1 parent e4b93bc commit 94d8780
Show file tree
Hide file tree
Showing 8 changed files with 178 additions and 15 deletions.
74 changes: 66 additions & 8 deletions src/query/service/src/interpreters/interpreter_set_priority.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,29 +14,84 @@

use std::sync::Arc;

use databend_common_catalog::table_context::TableContext;
use databend_common_config::GlobalConfig;
use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
use databend_common_sql::plans::SetPriorityPlan;

use crate::interpreters::Interpreter;
use crate::pipelines::PipelineBuildResult;
use crate::servers::flight::v1::packets::Packet;
use crate::servers::flight::v1::packets::SetPriorityPacket;
use crate::sessions::QueryContext;

pub struct SetPriorityInterpreter {
ctx: Arc<QueryContext>,
plan: SetPriorityPlan,
proxy_to_cluster: bool,
}

impl SetPriorityInterpreter {
pub fn try_create(ctx: Arc<QueryContext>, plan: SetPriorityPlan) -> Result<Self> {
Ok(SetPriorityInterpreter { ctx, plan })
Ok(SetPriorityInterpreter {
ctx,
plan,
proxy_to_cluster: true,
})
}

pub fn from_flight(ctx: Arc<QueryContext>, packet: SetPriorityPacket) -> Result<Self> {
Ok(SetPriorityInterpreter {
ctx,
plan: SetPriorityPlan {
id: packet.id,
priority: packet.priority,
},
proxy_to_cluster: false,
})
}

#[async_backtrace::framed]
async fn set_cluster_priority(&self) -> Result<PipelineBuildResult> {
let settings = self.ctx.get_settings();
let timeout = settings.get_flight_client_timeout()?;
let conf = GlobalConfig::instance();
let cluster = self.ctx.get_cluster();
for node_info in &cluster.nodes {
if node_info.id != cluster.local_id {
let set_priority_packet = SetPriorityPacket::create(
self.plan.id.clone(),
self.plan.priority,
node_info.clone(),
);

match set_priority_packet.commit(conf.as_ref(), timeout).await {
Ok(_) => {
return Ok(PipelineBuildResult::create());
}
Err(cause) => match cause.code() == ErrorCode::UNKNOWN_SESSION {
true => {
continue;
}
false => {
return Err(cause);
}
},
}
}
}
Err(ErrorCode::UnknownSession(format!(
"Not found session id {}",
self.plan.id
)))
}
}

#[async_trait::async_trait]
impl Interpreter for SetPriorityInterpreter {
fn name(&self) -> &str {
"AdjustPriorityInterpreter"
"SetPriorityInterpreter"
}

fn is_ddl(&self) -> bool {
Expand All @@ -48,14 +103,17 @@ impl Interpreter for SetPriorityInterpreter {
async fn execute2(&self) -> Result<PipelineBuildResult> {
let id = &self.plan.id;
match self.ctx.get_session_by_id(id) {
Some(adjust_session) => {
adjust_session.set_query_priority(self.plan.priority);
None => match self.proxy_to_cluster {
true => self.set_cluster_priority().await,
false => Err(ErrorCode::UnknownSession(format!(
"Not found session id {}",
id
))),
},
Some(set_session) => {
set_session.set_query_priority(self.plan.priority);
Ok(PipelineBuildResult::create())
}
None => Err(ErrorCode::UnknownSession(format!(
"Not found session id {}",
id
))),
}
}
}
1 change: 1 addition & 0 deletions src/query/service/src/interpreters/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ pub use interpreter_role_set_secondary::SetSecondaryRolesInterpreter;
pub use interpreter_select::SelectInterpreter;
pub use interpreter_sequence_create::CreateSequenceInterpreter;
pub use interpreter_sequence_drop::DropSequenceInterpreter;
pub use interpreter_set_priority::SetPriorityInterpreter;
pub use interpreter_setting::SettingInterpreter;
pub use interpreter_share_alter_tenants::AlterShareTenantsInterpreter;
pub use interpreter_share_create::CreateShareInterpreter;
Expand Down
33 changes: 33 additions & 0 deletions src/query/service/src/servers/flight/v1/actions/flight_actions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use tonic::Status;
use crate::servers::flight::v1::packets::InitNodesChannelPacket;
use crate::servers::flight::v1::packets::KillQueryPacket;
use crate::servers::flight::v1::packets::QueryFragmentsPlanPacket;
use crate::servers::flight::v1::packets::SetPriorityPacket;
use crate::servers::flight::v1::packets::TruncateTablePacket;

#[derive(serde::Serialize, serde::Deserialize, Clone, Debug)]
Expand Down Expand Up @@ -132,13 +133,40 @@ impl TryInto<Vec<u8>> for KillQuery {
}
}

#[derive(serde::Serialize, serde::Deserialize, Clone, Debug)]
pub struct SetPriority {
pub packet: SetPriorityPacket,
}

impl TryInto<SetPriority> for Vec<u8> {
type Error = Status;

fn try_into(self) -> Result<SetPriority, Self::Error> {
match serde_json::from_slice::<SetPriority>(&self) {
Err(cause) => Err(Status::invalid_argument(cause.to_string())),
Ok(action) => Ok(action),
}
}
}

impl TryInto<Vec<u8>> for SetPriority {
type Error = ErrorCode;
fn try_into(self) -> Result<Vec<u8>, Self::Error> {
serde_json::to_vec(&self).map_err_to_code(
ErrorCode::Internal,
|| "Logical error: cannot serialize SetPriority.",
)
}
}

#[derive(Clone, Debug)]
pub enum FlightAction {
InitQueryFragmentsPlan(InitQueryFragmentsPlan),
InitNodesChannel(InitNodesChannel),
ExecutePartialQuery(String),
TruncateTable(TruncateTable),
KillQuery(KillQuery),
SetPriority(SetPriority),
}

impl TryInto<FlightAction> for Action {
Expand All @@ -158,6 +186,7 @@ impl TryInto<FlightAction> for Action {
},
"TruncateTable" => Ok(FlightAction::TruncateTable(self.body.try_into()?)),
"KillQuery" => Ok(FlightAction::KillQuery(self.body.try_into()?)),
"SetPriority" => Ok(FlightAction::SetPriority(self.body.try_into()?)),
un_implemented => Err(Status::unimplemented(format!(
"UnImplement action {}",
un_implemented
Expand Down Expand Up @@ -191,6 +220,10 @@ impl TryInto<Action> for FlightAction {
r#type: String::from("KillQuery"),
body: kill_query.try_into()?,
}),
FlightAction::SetPriority(set_priority) => Ok(Action {
r#type: String::from("SetPriority"),
body: set_priority.try_into()?,
}),
}
}
}
1 change: 1 addition & 0 deletions src/query/service/src/servers/flight/v1/actions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,5 @@ pub use flight_actions::FlightAction;
pub use flight_actions::InitNodesChannel;
pub use flight_actions::InitQueryFragmentsPlan;
pub use flight_actions::KillQuery;
pub use flight_actions::SetPriority;
pub use flight_actions::TruncateTable;
14 changes: 14 additions & 0 deletions src/query/service/src/servers/flight/v1/flight_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ use tonic::Streaming;

use crate::interpreters::Interpreter;
use crate::interpreters::KillInterpreter;
use crate::interpreters::SetPriorityInterpreter;
use crate::interpreters::TruncateTableInterpreter;
use crate::servers::flight::request_builder::RequestGetter;
use crate::servers::flight::v1::actions::FlightAction;
Expand Down Expand Up @@ -250,6 +251,19 @@ impl FlightService for DatabendQueryFlightService {
interpreter.execute2().await?;
FlightResult { body: vec![] }
}
FlightAction::SetPriority(set_priority) => {
let config = GlobalConfig::instance();
let session_manager = SessionManager::instance();
let settings = Settings::create(config.query.tenant_id.clone());
let session =
session_manager.create_with_settings(SessionType::FlightRPC, settings)?;
let ctx = session.create_query_context().await?;

let interpreter =
SetPriorityInterpreter::from_flight(ctx, set_priority.packet)?;
interpreter.execute2().await?;
FlightResult { body: vec![] }
}
};

Ok(RawResponse::new(
Expand Down
2 changes: 2 additions & 0 deletions src/query/service/src/servers/flight/v1/packets/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ mod packet_executor;
mod packet_fragment;
mod packet_kill_query;
mod packet_publisher;
mod packet_set_priority;
mod packet_truncate_table;

pub use packet::Packet;
Expand All @@ -32,4 +33,5 @@ pub use packet_fragment::FragmentPlanPacket;
pub use packet_kill_query::KillQueryPacket;
pub use packet_publisher::ConnectionInfo;
pub use packet_publisher::InitNodesChannelPacket;
pub use packet_set_priority::SetPriorityPacket;
pub use packet_truncate_table::TruncateTablePacket;
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
// Copyright 2021 Datafuse Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;

use databend_common_config::InnerConfig;
use databend_common_exception::Result;
use databend_common_meta_types::NodeInfo;

use crate::servers::flight::v1::actions::FlightAction;
use crate::servers::flight::v1::actions::SetPriority;
use crate::servers::flight::v1::packets::packet::create_client;
use crate::servers::flight::v1::packets::Packet;

#[derive(serde::Serialize, serde::Deserialize, Clone, Debug)]
pub struct SetPriorityPacket {
pub id: String,
pub priority: u8,
pub executor: Arc<NodeInfo>,
}

impl SetPriorityPacket {
pub fn create(id: String, priority: u8, executor: Arc<NodeInfo>) -> SetPriorityPacket {
SetPriorityPacket {
id,
priority,
executor,
}
}
}

#[async_trait::async_trait]
impl Packet for SetPriorityPacket {
#[async_backtrace::framed]
async fn commit(&self, config: &InnerConfig, timeout: u64) -> Result<()> {
let executor_info = &self.executor;
let mut conn = create_client(config, &executor_info.flight_address).await?;
let action = FlightAction::SetPriority(SetPriority {
packet: self.clone(),
});
conn.execute_action(action, timeout).await
}
}
14 changes: 7 additions & 7 deletions tests/suites/1_stateful/02_query/02_0006_set_priority.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,19 +27,19 @@
)
time.sleep(0.5)

mycursor = mydb.cursor()
mycursor = mydb.cursor(buffered=True)
mycursor.execute(
"SELECT id FROM system.processes WHERE extra_info LIKE '%SELECT max(number)%' AND extra_info NOT LIKE '%system.processes%';"
)
res = mycursor.fetchone()
query_id = str(res[0])

adjust_priority_query = "set priority high '" + str(res[0]) + "';"
adjust_priority_query = "set priority high '" + query_id + "';"
mycursor.execute(adjust_priority_query)

res = mycursor.fetchone()
time.sleep(0.2)

kill_query = "kill query '" + str(res[0]) + "';"
kill_query = "kill query '" + query_id + "';"
mycursor.execute(kill_query)
time.sleep(0.1)

client1.expect(prompt)

res = mycursor.fetchone()

0 comments on commit 94d8780

Please sign in to comment.