Skip to content

Commit

Permalink
test(spanner): fix failed TestRsdBlockingStates test (#2597)
Browse files Browse the repository at this point in the history
  • Loading branch information
hengfengli committed Jul 14, 2020
1 parent 1257173 commit d1ef23f
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 26 deletions.
54 changes: 28 additions & 26 deletions spanner/internal/testutil/inmem_spanner_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,33 +117,35 @@ func (s *StatementResult) ToPartialResultSets(resumeToken []byte) (result []*spa
}

totalRows := uint64(len(s.ResultSet.Rows))
for {
rowCount := min(totalRows-startIndex, uint64(MaxRowsPerPartialResultSet))
rows := s.ResultSet.Rows[startIndex : startIndex+rowCount]
values := make([]*structpb.Value,
len(rows)*len(s.ResultSet.Metadata.RowType.Fields))
var idx int
for _, row := range rows {
for colIdx := range s.ResultSet.Metadata.RowType.Fields {
values[idx] = row.Values[colIdx]
idx++
if totalRows > 0 {
for {
rowCount := min(totalRows-startIndex, uint64(MaxRowsPerPartialResultSet))
rows := s.ResultSet.Rows[startIndex : startIndex+rowCount]
values := make([]*structpb.Value,
len(rows)*len(s.ResultSet.Metadata.RowType.Fields))
var idx int
for _, row := range rows {
for colIdx := range s.ResultSet.Metadata.RowType.Fields {
values[idx] = row.Values[colIdx]
idx++
}
}
var rt []byte
if len(s.ResumeTokens) == 0 {
rt = EncodeResumeToken(startIndex + rowCount)
} else {
rt = s.ResumeTokens[startIndex]
}
result = append(result, &spannerpb.PartialResultSet{
Metadata: s.ResultSet.Metadata,
Values: values,
ResumeToken: rt,
})

startIndex += rowCount
if startIndex == totalRows {
break
}
}
var rt []byte
if len(s.ResumeTokens) == 0 {
rt = EncodeResumeToken(startIndex + rowCount)
} else {
rt = s.ResumeTokens[startIndex]
}
result = append(result, &spannerpb.PartialResultSet{
Metadata: s.ResultSet.Metadata,
Values: values,
ResumeToken: rt,
})

startIndex += rowCount
if startIndex == totalRows {
break
}
}
return result, nil
Expand Down
6 changes: 6 additions & 0 deletions spanner/read_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"errors"
"fmt"
"sync"
"sync/atomic"
"testing"
"time"
Expand Down Expand Up @@ -1140,14 +1141,17 @@ func TestRsdBlockingStates(t *testing.T) {
if err != nil {
t.Fatalf("failed to set up a result for a statement: %v", err)
}
var mutex = &sync.Mutex{}
var rs []*sppb.PartialResultSet
go func() {
for {
if !r.next() {
// Note that r.Next also exits on context cancel/timeout.
return
}
mutex.Lock()
rs = append(rs, r.get())
mutex.Unlock()
}
}()
// Verify that resumableStreamDecoder reaches expected state.
Expand All @@ -1160,6 +1164,8 @@ func TestRsdBlockingStates(t *testing.T) {
}
// Check if resumableStreamDecoder returns expected array of
// PartialResultSets.
mutex.Lock()
defer mutex.Unlock()
if !testEqual(rs, test.want) {
t.Fatalf("received PartialResultSets: \n%v\n, want \n%v\n", rs, test.want)
}
Expand Down

0 comments on commit d1ef23f

Please sign in to comment.