Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(graph): support RedisGraph query #2969

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 6 additions & 0 deletions .github/workflows/build.yml
Expand Up @@ -25,6 +25,12 @@ jobs:
--health-cmd "redis-cli ping" --health-interval 10s --health-timeout 5s --health-retries 5
ports:
- 6379:6379
falkordb:
image: falkordb/falkordb:edge
options: >-
--health-cmd "redis-cli ping" --health-interval 10s --health-timeout 5s --health-retries 5
ports:
- 36379:6379

steps:
- name: Set up ${{ matrix.go-version }}
Expand Down
6 changes: 6 additions & 0 deletions .github/workflows/doctests.yaml
Expand Up @@ -21,6 +21,12 @@ jobs:
--health-cmd "redis-cli ping" --health-interval 10s --health-timeout 5s --health-retries 5
ports:
- 6379:6379
falkordb:
image: falkordb/falkordb:edge
options: >-
--health-cmd "redis-cli ping" --health-interval 10s --health-timeout 5s --health-retries 5
ports:
- 36379:6379

strategy:
fail-fast: false
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/test-redis-enterprise.yml
Expand Up @@ -50,6 +50,7 @@ jobs:
run: |
go test \
--ginkgo.skip-file="ring_test.go" \
--ginkgo.skip-file="graph_test.go" \
--ginkgo.skip-file="sentinel_test.go" \
--ginkgo.skip-file="osscluster_test.go" \
--ginkgo.skip-file="pubsub_test.go" \
Expand Down
282 changes: 282 additions & 0 deletions command.go
Expand Up @@ -3,7 +3,9 @@ package redis
import (
"bufio"
"context"
"errors"
"fmt"
"math/big"
"net"
"regexp"
"strconv"
Expand Down Expand Up @@ -5484,3 +5486,283 @@ func (cmd *MonitorCmd) Stop() {
defer cmd.mu.Unlock()
cmd.status = monitorStatusStop
}

// --------------------------------------------------------------------------------------

type GraphCmd struct {
baseCmd

val *GraphResult
}

var _ Cmder = (*GraphCmd)(nil)

func NewGraphCmd(ctx context.Context, args ...any) *GraphCmd {
return &GraphCmd{
baseCmd: baseCmd{
ctx: ctx,
args: args,
},
}
}

func (cmd *GraphCmd) SetVal(val *GraphResult) {
cmd.val = val
}

func (cmd *GraphCmd) Val() *GraphResult {
return cmd.val
}

func (cmd *GraphCmd) Result() (*GraphResult, error) {
return cmd.val, cmd.err
}

func (cmd *GraphCmd) String() string {
return cmdString(cmd, cmd.val)
}

func (cmd *GraphCmd) readReply(rd *proto.Reader) error {
cmd.val = &GraphResult{}
cmd.val.err = cmd.readGraph(rd)
return cmd.val.err
}

func (cmd *GraphCmd) readGraph(rd *proto.Reader) error {
n, err := rd.ReadArrayLen()
if err != nil {
return err
}
if n != 1 && n != 3 {
return fmt.Errorf("redis: invalid number of elements in graph result: %d", n)
}

if n == 1 {
// create?
cmd.val.noResult = true
if cmd.val.text, err = cmd.readStringArray(rd); err != nil {
return err
}
return nil
}

// n = 3, read graph result
if cmd.val.field, err = cmd.readStringArray(rd); err != nil {
return err
}
fieldLen := len(cmd.val.field)
if fieldLen == 0 {
return nil
}

// read response
rows, err := rd.ReadArrayLen()
if err != nil {
return err
}
cmd.val.rows = make([][]*graphRow, rows)
for i := 0; i < rows; i++ {
// field == row
if err = rd.ReadFixedArrayLen(fieldLen); err != nil {
return err
}

cmd.val.rows[i] = make([]*graphRow, fieldLen)
for f := 0; f < fieldLen; f++ {
next, err := rd.PeekReplyType()
if err != nil {
return err
}
var nn int
switch next {
case proto.RespArray, proto.RespSet, proto.RespPush:
nn, err = rd.ReadArrayLen()
if err != nil {
return err
}
default:
nn = 1
}

// 1: int/string/nil
// 3: node, id + labels + properties
// 5: edge, id + type + src_node + dest_node + properties
switch nn {
case 1:
data, err := cmd.readData(rd)
if err != nil {
return err
}
cmd.val.rows[i][f] = &graphRow{
typ: graphResultBasic,
basic: data,
}
case 3:
node, err := cmd.readNode(rd)
if err != nil {
return err
}
cmd.val.rows[i][f] = &graphRow{
typ: graphResultNode,
node: node,
}
case 5:
edge, err := cmd.readEdge(rd)
if err != nil {
return err
}
cmd.val.rows[i][f] = &graphRow{
typ: graphResultEdge,
edge: edge,
}
default:
return fmt.Errorf("redis: graph-row-field-len, got %d elements in the array, wanted %v", nn, "1/3/5")
}
}
}
if cmd.val.text, err = cmd.readStringArray(rd); err != nil {
return err
}

return err
}

// node = 3, id +labels + properties
func (cmd *GraphCmd) readNode(rd *proto.Reader) (GraphNode, error) {
node := GraphNode{}
for j := 0; j < 3; j++ {
if err := rd.ReadFixedArrayLen(2); err != nil {
return node, err
}
key, err := rd.ReadString()
if err != nil {
return node, err
}
switch key {
case "id":
if node.ID, err = rd.ReadInt(); err != nil {
return node, err
}
case "labels":
if node.Labels, err = cmd.readStringArray(rd); err != nil {
return node, err
}
case "properties":
if node.Properties, err = cmd.readProperties(rd); err != nil {
return node, err
}
default:
return node, fmt.Errorf("redis: invalid graph node key - %s", key)
}
}
return node, nil
}

// edge = 5, id + type + src_node + dest_node + properties
func (cmd *GraphCmd) readEdge(rd *proto.Reader) (GraphEdge, error) {
edge := GraphEdge{}
for j := 0; j < 5; j++ {
if err := rd.ReadFixedArrayLen(2); err != nil {
return edge, err
}
key, err := rd.ReadString()
if err != nil {
return edge, err
}
switch key {
case "id":
if edge.ID, err = rd.ReadInt(); err != nil {
return edge, err
}
case "type":
if edge.Typ, err = rd.ReadString(); err != nil {
return edge, err
}
case "src_node":
if edge.SrcNode, err = rd.ReadInt(); err != nil {
return edge, err
}
case "dest_node":
if edge.DstNode, err = rd.ReadInt(); err != nil {
return edge, err
}
case "properties":
if edge.Properties, err = cmd.readProperties(rd); err != nil {
return edge, err
}
default:
return edge, fmt.Errorf("redis: invalid graph edge key - %s", key)
}
}
return edge, nil
}

func (cmd *GraphCmd) readProperties(rd *proto.Reader) (map[string]GraphData, error) {
n, err := rd.ReadArrayLen()
if err != nil {
return nil, err
}
m := make(map[string]GraphData, n)
for i := 0; i < n; i++ {
if err = rd.ReadFixedArrayLen(2); err != nil {
return nil, err
}
key, err := rd.ReadString()
if err != nil {
return nil, err
}
val, err := cmd.readData(rd)
if err != nil {
return nil, err
}
m[key] = val
}
return m, nil
}

func (cmd *GraphCmd) readData(rd *proto.Reader) (GraphData, error) {
var data GraphData
reply, err := rd.ReadReply()
if err != nil {
if errors.Is(err, Nil) {
data.typ = graphNil
return data, nil
}
return data, err
}

switch v := reply.(type) {
case string:
data.typ = graphString
data.stringVal = v
case int64:
data.typ = graphInteger
data.integerVal = v
case *big.Int:
data.typ = graphInteger
if !v.IsInt64() {
return data, fmt.Errorf("redis: bigInt(%s) value out of range", v.String())
}
data.integerVal = v.Int64()
default:
return data, fmt.Errorf("redis: invalid reply - %q", reply)
}
return data, nil
}

func (cmd *GraphCmd) readStringArray(rd *proto.Reader) ([]string, error) {
n, err := rd.ReadArrayLen()
if err != nil {
return nil, err
}
if n == 0 {
return nil, nil
}
ss := make([]string, n)
for i := 0; i < n; i++ {
if ss[i], err = rd.ReadString(); err != nil {
return ss, err
}
}
return ss, nil
}
1 change: 1 addition & 0 deletions commands.go
Expand Up @@ -226,6 +226,7 @@ type Cmdable interface {
StreamCmdable
TimeseriesCmdable
JSONCmdable
GraphCmdable
}

type StatefulCmdable interface {
Expand Down