Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CosmosDBState - Add, as option, the partitionKey in QueryMethod #3227

Merged
merged 6 commits into from
Jan 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
14 changes: 12 additions & 2 deletions state/azure/cosmosdb/cosmosdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,11 @@ func (p crossPartitionQueryPolicy) Do(req *policy.Request) (*http.Response, erro
// Check if we're performing a query
// In that case, remove the partitionkey header and enable cross-partition queries
if strings.ToLower(raw.Header.Get("x-ms-documentdb-query")) == "true" {
raw.Header.Add("x-ms-documentdb-query-enablecrosspartition", "true")
raw.Header.Del("x-ms-documentdb-partitionkey")
// Only when the partitionKey is fake (true), it will be removed amd enabled the cross partition
if strings.ToLower(raw.Header.Get("x-ms-documentdb-partitionkey")) == "[true]" {
raw.Header.Add("x-ms-documentdb-query-enablecrosspartition", "true")
raw.Header.Del("x-ms-documentdb-partitionkey")
berndverst marked this conversation as resolved.
Show resolved Hide resolved
}
}
return req.Next()
}
Expand Down Expand Up @@ -206,6 +209,7 @@ func (c *StateStore) Features() []state.Feature {
state.FeatureTransactional,
state.FeatureQueryAPI,
state.FeatureTTL,
state.FeaturePartitionKey,
berndverst marked this conversation as resolved.
Show resolved Hide resolved
}
}

Expand Down Expand Up @@ -594,6 +598,12 @@ func (c *StateStore) Query(ctx context.Context, req *state.QueryRequest) (*state
return &state.QueryResponse{}, err
}

// If present partitionKey, the value will be used in the query disabling the cross partition
q.partitionKey = ""
if val, found := req.Metadata[metadataPartitionKey]; found {
q.partitionKey = val
}

data, token, err := q.execute(ctx, c.client)
if err != nil {
return nil, err
Expand Down
16 changes: 12 additions & 4 deletions state/azure/cosmosdb/cosmosdb_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,10 @@ type InternalQuery struct {
}

type Query struct {
query InternalQuery
limit int
token string
query InternalQuery
limit int
token string
partitionKey string
}

func (q *Query) VisitEQ(f *query.EQ) (string, error) {
Expand Down Expand Up @@ -264,7 +265,14 @@ func (q *Query) execute(ctx context.Context, client *azcosmos.ContainerClient) (
}

items := []CosmosItem{}
pk := azcosmos.NewPartitionKeyBool(true)

var pk azcosmos.PartitionKey
if q.partitionKey != "" {
pk = azcosmos.NewPartitionKeyString(q.partitionKey)
} else {
pk = azcosmos.NewPartitionKeyBool(true)
}

queryPager := client.NewQueryItemsPager(q.query.query, pk, opts)

token := ""
Expand Down
2 changes: 2 additions & 0 deletions state/feature.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ const (
FeatureTTL Feature = "TTL"
// FeatureDeleteWithPrefix is the feature that supports deleting with prefix.
FeatureDeleteWithPrefix Feature = "DELETE_WITH_PREFIX"
// FeaturePartitionKey is the feature that supports the partition
FeaturePartitionKey Feature = "PARTITION_KEY"
berndverst marked this conversation as resolved.
Show resolved Hide resolved
)

// Feature names a feature that can be implemented by state store components.
Expand Down
97 changes: 89 additions & 8 deletions tests/conformance/state/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ type StructType struct {
Product struct {
Value int `json:"value"`
} `json:"product"`
Status string `json:"status"`
Status string `json:"status"`
Message string `json:"message"`
}

type intValueType struct {
Expand All @@ -62,8 +63,10 @@ type scenario struct {
}

type queryScenario struct {
query string
results []state.QueryItem
query string
results []state.QueryItem
metadata map[string]string
partitionOnly bool
}

type TestConfig struct {
Expand Down Expand Up @@ -130,14 +133,19 @@ func ConformanceTests(t *testing.T, props map[string]string, statestore state.St
key: fmt.Sprintf("%s-struct-operations", key),
value: StructType{Product: struct {
Value int `json:"value"`
}{Value: 15}, Status: "ACTIVE"},
}{Value: 15}, Status: "ACTIVE", Message: fmt.Sprintf("%smessage", key)},
contentType: contenttype.JSONContentType,
},
{
key: fmt.Sprintf("%s-struct-operations-inactive", key),
value: StructType{Product: struct {
Value int `json:"value"`
}{Value: 12}, Status: "INACTIVE"},
}{Value: 12}, Status: "INACTIVE", Message: fmt.Sprintf("%smessage", key)},
contentType: contenttype.JSONContentType,
},
{
key: fmt.Sprintf("%s-struct-2", key),
value: ValueType{Message: fmt.Sprintf("%stest", key)},
contentType: contenttype.JSONContentType,
},
{
Expand Down Expand Up @@ -261,6 +269,9 @@ func ConformanceTests(t *testing.T, props map[string]string, statestore state.St
{
"filter": {
"AND": [
{
"EQ": {"message": "` + key + `message"}
},
{
"GTE": {"product.value": 10}
},
Expand All @@ -277,7 +288,7 @@ func ConformanceTests(t *testing.T, props map[string]string, statestore state.St
results: []state.QueryItem{
{
Key: fmt.Sprintf("%s-struct-operations", key),
Data: []byte(fmt.Sprintf(`{"product":{"value":15}, "status":"ACTIVE"}`)),
Data: []byte(fmt.Sprintf(`{"product":{"value":15}, "status":"ACTIVE","message":"%smessage"}`, key)),
},
},
},
Expand All @@ -288,6 +299,9 @@ func ConformanceTests(t *testing.T, props map[string]string, statestore state.St
"OR": [
{
"AND": [
{
"EQ": {"message": "` + key + `message"}
},
{
"GT": {"product.value": 11.1}
},
Expand All @@ -298,6 +312,9 @@ func ConformanceTests(t *testing.T, props map[string]string, statestore state.St
},
{
"AND": [
{
"EQ": {"message": "` + key + `message"}
},
{
"LTE": {"product.value": 0.5}
},
Expand All @@ -313,7 +330,65 @@ func ConformanceTests(t *testing.T, props map[string]string, statestore state.St
results: []state.QueryItem{
{
Key: fmt.Sprintf("%s-struct-operations-inactive", key),
Data: []byte(fmt.Sprintf(`{"product":{"value":12}, "status":"INACTIVE"}`)),
Data: []byte(fmt.Sprintf(`{"product":{"value":12}, "status":"INACTIVE","message":"%smessage"}`, key)),
},
},
},

// In CosmosDB this query uses the cross partition ( value with 2 different partitionKey <key>-struct and <key>-struct-2)
{
query: `
{
"filter": {
"OR": [
{
"EQ": {"message": "` + key + `test"}
},
{
"IN": {"message": ["test` + key + `", "dummy"]}
}
]
}
}
`,
// Return 2 item from 2 different partitionKey (<key>-struct and <key>-struct-2), for default the partitionKey is equals to key
results: []state.QueryItem{
{
Key: fmt.Sprintf("%s-struct", key),
Data: []byte(fmt.Sprintf(`{"message":"test%s"}`, key)),
},
{
Key: fmt.Sprintf("%s-struct-2", key),
Data: []byte(fmt.Sprintf(`{"message":"%stest"}`, key)),
},
},
},

// Test for CosmosDB (filter test with partitionOnly) this query doesn't use the cross partition ( value from 2 different partitionKey %s-struct and %s-struct-2)
{
query: `
{
"filter": {
"OR": [
{
"EQ": {"message": "` + key + `test"}
},
{
"IN": {"message": ["test` + key + `", "dummy"]}
}
]
}
}
`,
metadata: map[string]string{
"partitionKey": fmt.Sprintf("%s-struct-2", key),
},
partitionOnly: true,
// The same query from previous test but return only item having the same partitionKey value (%s-struct-2) given in the metadata
results: []state.QueryItem{
{
Key: fmt.Sprintf("%s-struct-2", key),
Data: []byte(fmt.Sprintf(`{"message":"%stest"}`, key)),
},
},
},
Expand Down Expand Up @@ -382,10 +457,12 @@ func ConformanceTests(t *testing.T, props map[string]string, statestore state.St
// Check if query feature is listed
features := statestore.Features()
require.True(t, state.FeatureQueryAPI.IsPresent(features))

querier, ok := statestore.(state.Querier)
assert.True(t, ok, "Querier interface is not implemented")
for _, scenario := range queryScenarios {
if (scenario.partitionOnly) && (!state.FeaturePartitionKey.IsPresent(features)) {
break
}
berndverst marked this conversation as resolved.
Show resolved Hide resolved
t.Logf("Querying value presence for %s", scenario.query)
var req state.QueryRequest
err := json.Unmarshal([]byte(scenario.query), &req.Query)
Expand All @@ -395,6 +472,10 @@ func ConformanceTests(t *testing.T, props map[string]string, statestore state.St
metadata.QueryIndexName: "qIndx",
}

if val, found := scenario.metadata["partitionKey"]; found {
req.Metadata["partitionKey"] = val
}

resp, err := querier.Query(context.Background(), &req)
require.NoError(t, err)
assert.Equal(t, len(scenario.results), len(resp.Results))
Expand Down