Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: try enable parquet encoding #15221

Draft
wants to merge 14 commits into
base: main
Choose a base branch
from
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 7 additions & 1 deletion src/query/ee/src/storages/fuse/operations/virtual_columns.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,13 @@ async fn materialize_virtual_columns(
let virtual_block = DataBlock::new(virtual_columns, len);

let mut buffer = Vec::with_capacity(DEFAULT_BLOCK_BUFFER_SIZE);
let _ = serialize_block(write_settings, &virtual_schema, virtual_block, &mut buffer)?;
let _ = serialize_block(
write_settings,
&virtual_schema,
virtual_block,
&mut buffer,
&Default::default(),
)?;

write_data(buffer, operator, location).await?;

Expand Down
5 changes: 2 additions & 3 deletions src/query/expression/src/converts/arrow/to.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,8 +170,7 @@ impl DataBlock {
impl Column {
pub fn into_arrow_rs(self) -> Arc<dyn arrow_array::Array> {
let arrow2_array: Box<dyn databend_common_arrow::arrow::array::Array> = self.as_arrow();
let arrow_array: Arc<dyn arrow_array::Array> = arrow2_array.into();
arrow_array
arrow2_array.into()
}
}

Expand Down Expand Up @@ -200,7 +199,7 @@ fn arrow_field_from_arrow2_field(field: Arrow2Field) -> ArrowField {
Arrow2DataType::Struct(f) => {
ArrowDataType::Struct(f.into_iter().map(arrow_field_from_arrow2_field).collect())
}
other => other.into(),
other => ArrowDataType::from(other),
};

ArrowField::new(field.name, data_type, field.is_nullable).with_metadata(metadata)
Expand Down
8 changes: 7 additions & 1 deletion src/query/formats/src/output_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,13 @@ impl OutputFormat for ParquetOutputFormat {
return Ok(vec![]);
}
let mut buf = Vec::with_capacity(DEFAULT_BLOCK_BUFFER_SIZE);
let _ = blocks_to_parquet(&self.schema, blocks, &mut buf, TableCompression::Zstd)?;
let _ = blocks_to_parquet(
&self.schema,
blocks,
&mut buf,
TableCompression::Zstd,
&Default::default(),
)?;
Ok(buf)
}
}
3 changes: 2 additions & 1 deletion src/query/service/src/test_kits/block_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ impl<'a> BlockWriter<'a> {
};

let mut buf = Vec::with_capacity(DEFAULT_BLOCK_BUFFER_SIZE);
let col_metas = serialize_block(&write_settings, schema, block, &mut buf)?;
let col_metas = serialize_block(&write_settings, schema, block, &mut buf, &col_stats)?;
let file_size = buf.len() as u64;

data_accessor.write(&location.0, buf).await?;
Expand Down Expand Up @@ -126,6 +126,7 @@ impl<'a> BlockWriter<'a> {
vec![index_block],
&mut data,
TableCompression::None,
&Default::default(),
)?;
let size = data.len() as u64;
data_accessor.write(&location.0, data).await?;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -760,7 +760,8 @@ impl CompactSegmentTestFixture {
};

let mut buf = Vec::with_capacity(DEFAULT_BLOCK_BUFFER_SIZE);
let col_metas = serialize_block(&write_settings, &schema, block, &mut buf)?;
let col_metas =
serialize_block(&write_settings, &schema, block, &mut buf, &col_stats)?;
let file_size = buf.len() as u64;

data_accessor.write(&location.0, buf).await?;
Expand Down
6 changes: 4 additions & 2 deletions src/query/storages/common/blocks/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,12 @@ doctest = false
test = true

[dependencies]
arrow-array = { workspace = true }
arrow-buffer = { workspace = true }
arrow-schema = { workspace = true }
databend-common-exception = { path = "../../../../common/exception" }
databend-common-expression = { path = "../../../expression" }
parquet_rs = { workspace = true }

databend-storages-common-table-meta = { path = "../table_meta" }
parquet_rs = { workspace = true }

[build-dependencies]
90 changes: 90 additions & 0 deletions src/query/storages/common/blocks/src/codec/byte_array.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
// Copyright 2021 Datafuse Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;

use arrow_array::types::ByteArrayType;
use arrow_array::types::LargeBinaryType;
use arrow_array::types::LargeUtf8Type;
use arrow_array::Array;
use arrow_array::GenericByteArray;
use arrow_schema::DataType as ArrowDataType;
use databend_common_exception::Result;
use databend_storages_common_table_meta::meta::ColumnStatistics;
use parquet_rs::basic::Encoding;
use parquet_rs::file::properties::WriterPropertiesBuilder;
use parquet_rs::schema::types::ColumnPath;

const ROWS_PER_DISTINCT_THRESHOLD: f64 = 10.0;
const SAMPLE_ROWS: usize = 1000;
const AVERAGE_PREFIX_LEN_THRESHOLD: f64 = 8.0;

pub fn choose_byte_array_encoding(
mut props: WriterPropertiesBuilder,
stat: Option<&ColumnStatistics>,
array: Arc<dyn Array>,
column_name: &str,
) -> Result<WriterPropertiesBuilder> {
if array.is_empty() {
return Ok(props);
}
let col_path = ColumnPath::new(vec![column_name.to_string()]);
let ndv = stat.as_ref().and_then(|s| s.distinct_of_values);
let num_rows = array.len();
if let Some(ndv) = ndv {
if num_rows as f64 / ndv as f64 > ROWS_PER_DISTINCT_THRESHOLD {
props = props.set_column_dictionary_enabled(col_path, true);
return Ok(props);
}
}
let data_type = array.data_type();
match data_type {
ArrowDataType::LargeBinary => {
if can_apply_delta_byte_array::<LargeBinaryType>(&array)? {
props = props.set_column_encoding(col_path, Encoding::DELTA_BYTE_ARRAY);
return Ok(props);
}
}
ArrowDataType::LargeUtf8 => {
if can_apply_delta_byte_array::<LargeUtf8Type>(&array)? {
props = props.set_column_encoding(col_path, Encoding::DELTA_BYTE_ARRAY);
return Ok(props);
}
}
_ => {}
};
props = props.set_column_encoding(col_path, Encoding::DELTA_LENGTH_BYTE_ARRAY);
Ok(props)
}

fn can_apply_delta_byte_array<T: ByteArrayType>(array: &dyn Array) -> Result<bool> {
let num_rows = array.len();
let array = array
.as_any()
.downcast_ref::<GenericByteArray<T>>()
.unwrap();
let mut sum_prefix_len = 0;
for i in 1..num_rows.min(SAMPLE_ROWS) {
let last: &[u8] = array.value(i - 1).as_ref();
let cur: &[u8] = array.value(i).as_ref();
let prefix_len = last
.iter()
.zip(cur.iter())
.take_while(|(a, b)| a == b)
.count();
sum_prefix_len += prefix_len;
}
let avg_prefix_len = sum_prefix_len as f64 / num_rows as f64;
Ok(avg_prefix_len > AVERAGE_PREFIX_LEN_THRESHOLD)
}
60 changes: 60 additions & 0 deletions src/query/storages/common/blocks/src/codec/choose.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
// Copyright 2021 Datafuse Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use databend_common_exception::Result;
use databend_common_expression::DataBlock;
use databend_common_expression::TableSchema;
use databend_storages_common_table_meta::meta::StatisticsOfColumns;
use parquet_rs::basic::Type as PhysicalType;
use parquet_rs::file::properties::WriterPropertiesBuilder;
use parquet_rs::schema::types::Type;
use parquet_rs::schema::types::TypePtr;

use super::byte_array::choose_byte_array_encoding;
use super::int::choose_int_encoding;

pub fn choose_codec(
mut props: WriterPropertiesBuilder,
block: &DataBlock,
parquet_fields: &[TypePtr],
table_schema: &TableSchema,
stat: &StatisticsOfColumns,
) -> Result<WriterPropertiesBuilder> {
for ((parquet_field, table_field), entry) in parquet_fields
.iter()
.zip(table_schema.fields.iter())
.zip(block.columns())
{
let column = entry.to_column(block.num_rows());
let array = column.into_arrow_rs();
let stat = stat.get(&table_field.column_id);
let column_name = table_field.name.as_str();
match parquet_field.as_ref() {
Type::PrimitiveType { physical_type, .. } => match physical_type {
PhysicalType::BYTE_ARRAY => {
props = choose_byte_array_encoding(props, stat, array, column_name)?;
}
PhysicalType::INT32 | PhysicalType::INT64 => {
props = choose_int_encoding(props, stat, array, column_name)?
}
_ => {}
},
Type::GroupType {
basic_info: _,
fields: _,
} => {} // TODO: handle nested fields
}
}
Ok(props)
}
85 changes: 85 additions & 0 deletions src/query/storages/common/blocks/src/codec/int.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
// Copyright 2021 Datafuse Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;

use arrow_array::types::ArrowPrimitiveType;
use arrow_array::types::Date32Type;
use arrow_array::types::Decimal128Type;
use arrow_array::types::Decimal256Type;
use arrow_array::types::Int32Type;
use arrow_array::types::Int64Type;
use arrow_array::types::TimestampMicrosecondType;
use arrow_array::types::UInt32Type;
use arrow_array::types::UInt64Type;
use arrow_array::Array;
use arrow_array::ArrowNativeTypeOp;
use arrow_array::PrimitiveArray;
use arrow_buffer::ArrowNativeType;
use arrow_schema::DataType as ArrowDataType;
use arrow_schema::TimeUnit;
use databend_common_exception::Result;
use databend_storages_common_table_meta::meta::ColumnStatistics;
use parquet_rs::basic::Encoding;
use parquet_rs::file::properties::WriterPropertiesBuilder;
use parquet_rs::schema::types::ColumnPath;

const MAX_WIDTH_THRESHOLD: i64 = 3;

pub fn choose_int_encoding(
mut props: WriterPropertiesBuilder,
_stat: Option<&ColumnStatistics>,
array: Arc<dyn Array>,
column_name: &str,
) -> Result<WriterPropertiesBuilder> {
if array.is_empty() {
return Ok(props);
}
let col_path = ColumnPath::new(vec![column_name.to_string()]);
let data_type = array.data_type();
let apply_delta = match data_type {
ArrowDataType::Int32 => can_apply_delta_binary_pack::<Int32Type>(&array)?,
ArrowDataType::Int64 => can_apply_delta_binary_pack::<Int64Type>(&array)?,
ArrowDataType::UInt32 => can_apply_delta_binary_pack::<UInt32Type>(&array)?,
ArrowDataType::UInt64 => can_apply_delta_binary_pack::<UInt64Type>(&array)?,
ArrowDataType::Timestamp(TimeUnit::Microsecond, _) => {
can_apply_delta_binary_pack::<TimestampMicrosecondType>(&array)?
}
ArrowDataType::Date32 => can_apply_delta_binary_pack::<Date32Type>(&array)?,
ArrowDataType::Decimal128(_, _) => can_apply_delta_binary_pack::<Decimal128Type>(&array)?,
ArrowDataType::Decimal256(_, _) => can_apply_delta_binary_pack::<Decimal256Type>(&array)?,
_ => false,
};
if apply_delta {
props = props.set_column_encoding(col_path, Encoding::DELTA_BINARY_PACKED);
}
Ok(props)
}

fn can_apply_delta_binary_pack<T: ArrowPrimitiveType>(array: &dyn Array) -> Result<bool> {
let mut max_delta = T::Native::MIN_TOTAL_ORDER;
let mut min_delta = T::Native::MAX_TOTAL_ORDER;
let array = array.as_any().downcast_ref::<PrimitiveArray<T>>().unwrap();
for i in 1..array.len() {
let delta = array.value(i).sub_wrapping(array.value(i - 1));
if delta.is_gt(max_delta) {
max_delta = delta;
}
if delta.is_lt(min_delta) {
min_delta = delta;
}
}
let x = max_delta.sub_wrapping(min_delta).as_usize();
Ok(x <= (1 << MAX_WIDTH_THRESHOLD))
}
19 changes: 19 additions & 0 deletions src/query/storages/common/blocks/src/codec/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
// Copyright 2021 Datafuse Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

mod byte_array;
mod choose;
mod int;

pub use choose::choose_codec;
1 change: 1 addition & 0 deletions src/query/storages/common/blocks/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,6 @@

#![allow(clippy::uninlined_format_args)]

mod codec;
mod parquet_rs;
pub use parquet_rs::blocks_to_parquet;