Skip to content

Commit

Permalink
feat(bigquery): RANGE support when reading Arrow format (#9795)
Browse files Browse the repository at this point in the history
  • Loading branch information
alvarowolfx committed Apr 18, 2024
1 parent 62d6ecf commit da245fa
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 62 deletions.
15 changes: 15 additions & 0 deletions bigquery/arrow.go
Expand Up @@ -272,6 +272,21 @@ func convertArrowValue(col arrow.Array, i int, ft arrow.DataType, fs *FieldSchem
arr := col.(*array.Struct)
nestedValues := []Value{}
fields := ft.(*arrow.StructType).Fields()
if fs.Type == RangeFieldType {
rangeFieldSchema := &FieldSchema{
Type: fs.RangeElementType.Type,
}
start, err := convertArrowValue(arr.Field(0), i, fields[0].Type, rangeFieldSchema)
if err != nil {
return nil, err
}
end, err := convertArrowValue(arr.Field(1), i, fields[1].Type, rangeFieldSchema)
if err != nil {
return nil, err
}
rangeValue := &RangeValue{Start: start, End: end}
return Value(rangeValue), nil
}
for fIndex, f := range fields {
v, err := convertArrowValue(arr.Field(fIndex), i, f.Type, fs.Schema[fIndex])
if err != nil {
Expand Down
89 changes: 42 additions & 47 deletions bigquery/integration_test.go
Expand Up @@ -2154,7 +2154,7 @@ var (
queryParameterTestCases = []queryParameterTestCase{}
)

func initQueryParameterTestCases(includeRangeCases bool) {
func initQueryParameterTestCases() {
d := civil.Date{Year: 2016, Month: 3, Day: 20}
tm := civil.Time{Hour: 15, Minute: 04, Second: 05, Nanosecond: 3008}
rtm := tm
Expand Down Expand Up @@ -2276,6 +2276,46 @@ func initQueryParameterTestCases(includeRangeCases bool) {
[]Value{"{\"alpha\":\"beta\"}"},
"{\"alpha\":\"beta\"}",
},
{
"RangeUnboundedStart",
"SELECT @val",
[]QueryParameter{
{
Name: "val",
Value: &QueryParameterValue{
Type: StandardSQLDataType{
TypeKind: "RANGE",
RangeElementType: &StandardSQLDataType{
TypeKind: "TIMESTAMP",
},
},
Value: rangeTimestamp1,
},
},
},
[]Value{rangeTimestamp1},
rangeTimestamp1,
},
{
"RangeUnboundedEnd",
"SELECT @val",
[]QueryParameter{
{
Name: "val",
Value: &QueryParameterValue{
Type: StandardSQLDataType{
TypeKind: "RANGE",
RangeElementType: &StandardSQLDataType{
TypeKind: "TIMESTAMP",
},
},
Value: rangeTimestamp2,
},
},
},
[]Value{rangeTimestamp2},
rangeTimestamp2,
},
{
"NestedStructParam",
"SELECT @val",
Expand Down Expand Up @@ -2448,51 +2488,6 @@ func initQueryParameterTestCases(includeRangeCases bool) {
},
},
}

if includeRangeCases {
queryParameterTestCases = append(queryParameterTestCases, []queryParameterTestCase{
{
"RangeUnboundedEnd",
"SELECT @val",
[]QueryParameter{
{
Name: "val",
Value: &QueryParameterValue{
Type: StandardSQLDataType{
TypeKind: "RANGE",
RangeElementType: &StandardSQLDataType{
TypeKind: "TIMESTAMP",
},
},
Value: rangeTimestamp1,
},
},
},
[]Value{rangeTimestamp1},
rangeTimestamp1,
},
{
"RangeUnboundedStart",
"SELECT @val",
[]QueryParameter{
{
Name: "val",
Value: &QueryParameterValue{
Type: StandardSQLDataType{
TypeKind: "RANGE",
RangeElementType: &StandardSQLDataType{
TypeKind: "TIMESTAMP",
},
},
Value: rangeTimestamp2,
},
},
},
[]Value{rangeTimestamp2},
rangeTimestamp2,
},
}...)
}
}

func TestIntegration_QueryParameters(t *testing.T) {
Expand All @@ -2501,7 +2496,7 @@ func TestIntegration_QueryParameters(t *testing.T) {
}
ctx := context.Background()

initQueryParameterTestCases(true)
initQueryParameterTestCases()

for _, tc := range queryParameterTestCases {
t.Run(tc.name, func(t *testing.T) {
Expand Down
32 changes: 17 additions & 15 deletions bigquery/storage_integration_test.go
Expand Up @@ -37,23 +37,25 @@ func TestIntegration_StorageReadBasicTypes(t *testing.T) {
}
ctx := context.Background()

initQueryParameterTestCases(false)
initQueryParameterTestCases()

for _, c := range queryParameterTestCases {
q := storageOptimizedClient.Query(c.query)
q.Parameters = c.parameters
q.forceStorageAPI = true
it, err := q.Read(ctx)
if err != nil {
t.Fatal(err)
}
err = checkIteratorRead(it, c.wantRow)
if err != nil {
t.Fatalf("%s: error on query `%s`[%v]: %v", it.SourceJob().ID(), c.query, c.parameters, err)
}
if !it.IsAccelerated() {
t.Fatalf("%s: expected storage api to be used", it.SourceJob().ID())
}
t.Run(c.name, func(t *testing.T) {
q := storageOptimizedClient.Query(c.query)
q.Parameters = c.parameters
q.forceStorageAPI = true
it, err := q.Read(ctx)
if err != nil {
t.Fatal(err)
}
err = checkIteratorRead(it, c.wantRow)
if err != nil {
t.Fatalf("%s: error on query `%s`[%v]: %v", it.SourceJob().ID(), c.query, c.parameters, err)
}
if !it.IsAccelerated() {
t.Fatalf("%s: expected storage api to be used", it.SourceJob().ID())
}
})
}
}

Expand Down

0 comments on commit da245fa

Please sign in to comment.