Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support adjust query's priority #15352

Merged
merged 16 commits into from
May 10, 2024
1 change: 1 addition & 0 deletions src/bendpy/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ databend-common-users = { path = "../query/users" }
databend-query = { path = "../query/service", features = [
"simd",
"disable_initial_exec_tls",
"enable_queries_executor",
], default-features = false }

# # Crates.io dependencies
Expand Down
2 changes: 2 additions & 0 deletions src/query/ast/src/ast/statements/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ mod notification;
mod password_policy;
mod pipe;
mod presign;
mod priority;
mod procedure;
mod replace;
mod script;
Expand Down Expand Up @@ -74,6 +75,7 @@ pub use notification::*;
pub use password_policy::*;
pub use pipe::*;
pub use presign::*;
pub use priority::*;
pub use procedure::*;
pub use replace::*;
pub use script::*;
Expand Down
35 changes: 35 additions & 0 deletions src/query/ast/src/ast/statements/priority.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
// Copyright 2021 Datafuse Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::fmt::Display;
use std::fmt::Formatter;

use derive_visitor::Drive;
use derive_visitor::DriveMut;
#[derive(Debug, Clone, PartialEq, Eq, Drive, DriveMut)]
pub enum Priority {
HIGH,
MEDIUM,
LOW,
}

impl Display for Priority {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
Priority::HIGH => write!(f, "HIGH"),
Priority::MEDIUM => write!(f, "MEDIUM"),
Priority::LOW => write!(f, "LOW"),
}
}
}
15 changes: 15 additions & 0 deletions src/query/ast/src/ast/statements/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,13 @@ pub enum Statement {
// sequence
CreateSequence(CreateSequenceStmt),
DropSequence(DropSequenceStmt),

// Set priority for query
SetPriority {
priority: Priority,
#[drive(skip)]
object_id: String,
},
}

#[derive(Debug, Clone, PartialEq)]
Expand Down Expand Up @@ -729,6 +736,14 @@ impl Display for Statement {
Statement::CreateSequence(stmt) => write!(f, "{stmt}")?,
Statement::DropSequence(stmt) => write!(f, "{stmt}")?,
Statement::CreateDynamicTable(stmt) => write!(f, "{stmt}")?,
Statement::SetPriority {
priority,
object_id,
} => {
write!(f, "SET PRIORITY")?;
write!(f, " {priority}")?;
write!(f, " '{object_id}'")?;
}
}
Ok(())
}
Expand Down
1 change: 1 addition & 0 deletions src/query/ast/src/ast/visitors/visitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -834,4 +834,5 @@ pub trait Visitor<'ast>: Sized {

fn visit_create_sequence(&mut self, _stmt: &'ast CreateSequenceStmt) {}
fn visit_drop_sequence(&mut self, _stmt: &'ast DropSequenceStmt) {}
fn visit_set_priority(&mut self, _priority: &'ast Priority, _object_id: &'ast str) {}
}
1 change: 1 addition & 0 deletions src/query/ast/src/ast/visitors/visitor_mut.rs
Original file line number Diff line number Diff line change
Expand Up @@ -845,4 +845,5 @@ pub trait VisitorMut: Sized {

fn visit_create_sequence(&mut self, _stmt: &mut CreateSequenceStmt) {}
fn visit_drop_sequence(&mut self, _stmt: &mut DropSequenceStmt) {}
fn visit_set_priority(&mut self, _priority: &mut Priority, _object_id: &mut String) {}
}
4 changes: 4 additions & 0 deletions src/query/ast/src/ast/visitors/walk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -589,5 +589,9 @@ pub fn walk_statement<'a, V: Visitor<'a>>(visitor: &mut V, statement: &'a Statem
Statement::CreateSequence(stmt) => visitor.visit_create_sequence(stmt),
Statement::DropSequence(stmt) => visitor.visit_drop_sequence(stmt),
Statement::CreateDynamicTable(stmt) => visitor.visit_create_dynamic_table(stmt),
Statement::SetPriority {
priority,
object_id,
} => visitor.visit_set_priority(priority, object_id),
}
}
4 changes: 4 additions & 0 deletions src/query/ast/src/ast/visitors/walk_mut.rs
Original file line number Diff line number Diff line change
Expand Up @@ -588,5 +588,9 @@ pub fn walk_statement_mut<V: VisitorMut>(visitor: &mut V, statement: &mut Statem
Statement::ExecuteImmediate(_) => {}
Statement::CreateSequence(stmt) => visitor.visit_create_sequence(stmt),
Statement::DropSequence(stmt) => visitor.visit_drop_sequence(stmt),
Statement::SetPriority {
priority,
object_id,
} => visitor.visit_set_priority(priority, object_id),
}
}
19 changes: 19 additions & 0 deletions src/query/ast/src/parser/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,16 @@ pub fn statement_body(i: Input) -> IResult<Statement> {
},
);

let set_priority = map(
rule! {
SET ~ PRIORITY ~ #priority ~ #parameter_to_string
},
|(_, _, priority, object_id)| Statement::SetPriority {
object_id,
priority,
},
);

let set_variable = map(
rule! {
SET ~ GLOBAL? ~ #ident ~ "=" ~ #subexpr(0)
Expand Down Expand Up @@ -2052,6 +2062,7 @@ pub fn statement_body(i: Input) -> IResult<Statement> {
| #show_locks : "`SHOW LOCKS [IN ACCOUNT] [WHERE ...]`"
| #kill_stmt : "`KILL (QUERY | CONNECTION) <object_id>`"
| #vacuum_temp_files : "VACUUM TEMPORARY FILES [RETAIN number SECONDS|DAYS] [LIMIT number]"
| #set_priority: "SET PRIORITY (HIGH | MEDIUM | LOW) <object_id>"
),
// database
rule!(
Expand Down Expand Up @@ -3636,6 +3647,14 @@ pub fn kill_target(i: Input) -> IResult<KillTarget> {
))(i)
}

pub fn priority(i: Input) -> IResult<Priority> {
alt((
value(Priority::LOW, rule! { LOW }),
value(Priority::MEDIUM, rule! { MEDIUM }),
value(Priority::HIGH, rule! { HIGH }),
))(i)
}

pub fn limit_where(i: Input) -> IResult<ShowLimit> {
map(
rule! {
Expand Down
8 changes: 8 additions & 0 deletions src/query/ast/src/parser/token.rs
Original file line number Diff line number Diff line change
Expand Up @@ -637,6 +637,8 @@ pub enum TokenKind {
GZIP,
#[token("HAVING", ignore(ascii_case))]
HAVING,
#[token("HIGH", ignore(ascii_case))]
HIGH,
#[token("HISTORY", ignore(ascii_case))]
HISTORY,
#[token("HIVE", ignore(ascii_case))]
Expand Down Expand Up @@ -736,6 +738,8 @@ pub enum TokenKind {
LIMIT,
#[token("LIST", ignore(ascii_case))]
LIST,
#[token("LOW", ignore(ascii_case))]
LOW,
#[token("LZO", ignore(ascii_case))]
LZO,
#[token("MASKING", ignore(ascii_case))]
Expand All @@ -746,6 +750,8 @@ pub enum TokenKind {
MAX_FILE_SIZE,
#[token("MASTER_KEY", ignore(ascii_case))]
MASTER_KEY,
#[token("MEDIUM", ignore(ascii_case))]
MEDIUM,
#[token("MEMO", ignore(ascii_case))]
MEMO,
#[token("MEMORY", ignore(ascii_case))]
Expand Down Expand Up @@ -862,6 +868,8 @@ pub enum TokenKind {
POSITION,
#[token("PROCESSLIST", ignore(ascii_case))]
PROCESSLIST,
#[token("PRIORITY", ignore(ascii_case))]
PRIORITY,
#[token("PURGE", ignore(ascii_case))]
PURGE,
#[token("QUARTER", ignore(ascii_case))]
Expand Down
2 changes: 1 addition & 1 deletion src/query/service/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ doctest = false
test = true

[features]
default = ["simd", "z3-prove"]
default = ["simd", "z3-prove", "enable_queries_executor"]
simd = ["databend-common-arrow/simd"]

z3-prove = ["databend-common-sql/z3-prove"]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -951,7 +951,7 @@ impl AccessChecker for PrivilegeAccess {
self.validate_access(&GrantObject::Global, UserPrivilegeType::Grant)
.await?;
}
Plan::SetVariable(_) | Plan::UnSetVariable(_) | Plan::Kill(_) => {
Plan::SetVariable(_) | Plan::UnSetVariable(_) | Plan::Kill(_) | Plan::SetPriority(_) => {
self.validate_access(&GrantObject::Global, UserPrivilegeType::Super)
.await?;
}
Expand Down
5 changes: 5 additions & 0 deletions src/query/service/src/interpreters/interpreter_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ use crate::interpreters::interpreter_notification_desc::DescNotificationInterpre
use crate::interpreters::interpreter_notification_drop::DropNotificationInterpreter;
use crate::interpreters::interpreter_presign::PresignInterpreter;
use crate::interpreters::interpreter_role_show::ShowRolesInterpreter;
use crate::interpreters::interpreter_set_priority::SetPriorityInterpreter;
use crate::interpreters::interpreter_table_create::CreateTableInterpreter;
use crate::interpreters::interpreter_table_revert::RevertTableInterpreter;
use crate::interpreters::interpreter_task_alter::AlterTaskInterpreter;
Expand Down Expand Up @@ -595,6 +596,10 @@ impl InterpreterFactory {
ctx,
*p.clone(),
)?)),
Plan::SetPriority(p) => Ok(Arc::new(SetPriorityInterpreter::try_create(
ctx,
*p.clone(),
)?)),
}
}
}
119 changes: 119 additions & 0 deletions src/query/service/src/interpreters/interpreter_set_priority.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
// Copyright 2021 Datafuse Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;

use databend_common_catalog::table_context::TableContext;
use databend_common_config::GlobalConfig;
use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
use databend_common_sql::plans::SetPriorityPlan;

use crate::interpreters::Interpreter;
use crate::pipelines::PipelineBuildResult;
use crate::servers::flight::v1::packets::Packet;
use crate::servers::flight::v1::packets::SetPriorityPacket;
use crate::sessions::QueryContext;

pub struct SetPriorityInterpreter {
ctx: Arc<QueryContext>,
plan: SetPriorityPlan,
proxy_to_cluster: bool,
}

impl SetPriorityInterpreter {
pub fn try_create(ctx: Arc<QueryContext>, plan: SetPriorityPlan) -> Result<Self> {
Ok(SetPriorityInterpreter {
ctx,
plan,
proxy_to_cluster: true,
})
}

pub fn from_flight(ctx: Arc<QueryContext>, packet: SetPriorityPacket) -> Result<Self> {
Ok(SetPriorityInterpreter {
ctx,
plan: SetPriorityPlan {
id: packet.id,
priority: packet.priority,
},
proxy_to_cluster: false,
})
}

#[async_backtrace::framed]
async fn set_cluster_priority(&self) -> Result<PipelineBuildResult> {
let settings = self.ctx.get_settings();
let timeout = settings.get_flight_client_timeout()?;
let conf = GlobalConfig::instance();
let cluster = self.ctx.get_cluster();
for node_info in &cluster.nodes {
if node_info.id != cluster.local_id {
let set_priority_packet = SetPriorityPacket::create(
self.plan.id.clone(),
self.plan.priority,
node_info.clone(),
);

match set_priority_packet.commit(conf.as_ref(), timeout).await {
Ok(_) => {
return Ok(PipelineBuildResult::create());
}
Err(cause) => match cause.code() == ErrorCode::UNKNOWN_SESSION {
true => {
continue;
}
false => {
return Err(cause);
}
},
}
}
}
Err(ErrorCode::UnknownSession(format!(
"Not found session id {}",
self.plan.id
)))
}
}

#[async_trait::async_trait]
impl Interpreter for SetPriorityInterpreter {
fn name(&self) -> &str {
"SetPriorityInterpreter"
}

fn is_ddl(&self) -> bool {
false
}

#[async_backtrace::framed]
#[minitrace::trace]
async fn execute2(&self) -> Result<PipelineBuildResult> {
let id = &self.plan.id;
match self.ctx.get_session_by_id(id) {
None => match self.proxy_to_cluster {
true => self.set_cluster_priority().await,
false => Err(ErrorCode::UnknownSession(format!(
"Not found session id {}",
id
))),
},
Some(set_session) => {
set_session.set_query_priority(self.plan.priority);
Ok(PipelineBuildResult::create())
}
}
}
}
2 changes: 2 additions & 0 deletions src/query/service/src/interpreters/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ mod interpreter_role_show;
mod interpreter_select;
mod interpreter_sequence_create;
mod interpreter_sequence_drop;
mod interpreter_set_priority;
mod interpreter_setting;
mod interpreter_share_alter_tenants;
mod interpreter_share_create;
Expand Down Expand Up @@ -196,6 +197,7 @@ pub use interpreter_role_set_secondary::SetSecondaryRolesInterpreter;
pub use interpreter_select::SelectInterpreter;
pub use interpreter_sequence_create::CreateSequenceInterpreter;
pub use interpreter_sequence_drop::DropSequenceInterpreter;
pub use interpreter_set_priority::SetPriorityInterpreter;
pub use interpreter_setting::SettingInterpreter;
pub use interpreter_share_alter_tenants::AlterShareTenantsInterpreter;
pub use interpreter_share_create::CreateShareInterpreter;
Expand Down