diff --git a/go.mod b/go.mod index 9bc1fbebf..0f9e0b911 100644 --- a/go.mod +++ b/go.mod @@ -3,9 +3,6 @@ module github.com/nats-io/nats.go go 1.16 require ( - github.com/golang/protobuf v1.5.0 - github.com/nats-io/nats-server/v2 v2.9.3 // indirect github.com/nats-io/nkeys v0.3.0 github.com/nats-io/nuid v1.0.1 - google.golang.org/protobuf v1.28.1 -) +) \ No newline at end of file diff --git a/go.sum b/go.sum index 0a05b0b51..00fe58ccd 100644 --- a/go.sum +++ b/go.sum @@ -1,68 +1,11 @@ -github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8= -github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA= -github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs= -github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:WU3c8KckQ9AFe+yFwt9sWVRKCVIyN9cPHBJSNnbL67w= -github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0= -github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= -github.com/golang/protobuf v1.5.0 h1:LUVKkCeviFUMKqHa4tXIIij/lbhnMbP7Fn5wKdKkRh4= -github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= -github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= -github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= -github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= -github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= -github.com/klauspost/compress v1.15.11 h1:Lcadnb3RKGin4FYM/orgq0qde+nc15E5Cbqg4B9Sx9c= -github.com/klauspost/compress v1.15.11/go.mod h1:QPwzmACJjUTFsnSHH934V6woptycfrDDJnH7hvFVbGM= -github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= -github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= -github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= -github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA/g= -github.com/minio/highwayhash v1.0.2/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY= -github.com/nats-io/jwt/v2 v2.3.0 h1:z2mA1a7tIf5ShggOFlR1oBPgd6hGqcDYsISxZByUzdI= -github.com/nats-io/jwt/v2 v2.3.0/go.mod h1:0tqz9Hlu6bCBFLWAASKhE5vUA4c24L9KPUUgvwumE/k= -github.com/nats-io/nats-server/v2 v2.9.3 h1:HrfzA7G9LNetKkm1z+jU/e9kuAe+E6uaBuuq9EB5sQQ= -github.com/nats-io/nats-server/v2 v2.9.3/go.mod h1:4sq8wvrpbvSzL1n3ZfEYnH4qeUuIl5W990j3kw13rRk= -github.com/nats-io/nats.go v1.17.0/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w= github.com/nats-io/nkeys v0.3.0 h1:cgM5tL53EvYRU+2YLXIK0G2mJtK12Ft9oeooSZMA2G8= github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4= github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= -github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/prashantv/gostub v1.1.0/go.mod h1:A5zLQHz7ieHGG7is6LLXLz7I8+3LZzsrV0P1IAHhP5U= -github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= -github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -go.uber.org/automaxprocs v1.5.1/go.mod h1:BF4eumQw0P9GtnuxxovUd06vwm1o18oMzFtK66vU6XU= golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b h1:wSOdpTq0/eI46Ez/LkDwIsAKA71YP2SRKBODiRWM0as= golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4= -golang.org/x/crypto v0.0.0-20220926161630-eccd6366d1be h1:fmw3UbQh+nxngCAHrDCCztao/kbYFnWjoqop8dHx05A= -golang.org/x/crypto v0.0.0-20220926161630-eccd6366d1be/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= -golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= -golang.org/x/sys v0.0.0-20190130150945-aca44879d564/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20220928140112-f11e5e49a4ec h1:BkDtF2Ih9xZ7le9ndzTA7KJow28VbQW3odyk/8drmuI= -golang.org/x/sys v0.0.0-20220928140112-f11e5e49a4ec/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= -golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= -golang.org/x/time v0.0.0-20220922220347-f3bd1da661af h1:Yx9k8YCG3dvF87UAn2tu2HQLf2dt/eR1bXxpLMWeH+Y= -golang.org/x/time v0.0.0-20220922220347-f3bd1da661af/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= -golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= -golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= -google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= -google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= -google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM= -google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod h1:A+miEFZTKqfCUM6K7xSMQL9OKL/b6hQv+e19PK+JZNE= -google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo= -google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= -google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= -google.golang.org/protobuf v1.28.1 h1:d0NfwRgPtno5B1Wa6L2DAG+KivqkdutMf1UhdNx175w= -google.golang.org/protobuf v1.28.1/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= -gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= -gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= \ No newline at end of file diff --git a/service.go b/service.go deleted file mode 100644 index 863b9b759..000000000 --- a/service.go +++ /dev/null @@ -1,373 +0,0 @@ -// Copyright 2022 The NATS Authors -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package nats - -import ( - "encoding/json" - "errors" - "fmt" - "strings" - "sync" - "time" -) - -type Service interface { - ID() string - Name() string - Description() string - Version() string - Stats() ServiceStats - Reset() - Stop() -} - -// A request handler. -// TODO (could make error more and return more info to user automatically?) -type ServiceHandler func(svc Service, req *Msg) error - -// Clients can request as well. -type ServiceStats struct { - Name string `json:"name"` - ID string `json:"id"` - Version string `json:"version"` - Started time.Time `json:"started"` - Endpoints []Stats `json:"stats"` -} -type Stats struct { - Name string `json:"name"` - NumRequests int `json:"numRequests"` - NumErrors int `json:"numErrors"` - TotalLatency time.Duration `json:"totalLatency"` - Data interface{} `json:"data"` -} - -// We can fix this, as versions will be on separate subjects and use account mapping to roll requests to new versions etc. -const QG = "svc" - -// ServiceInfo is the basic information about a service type -type ServiceInfo struct { - Name string `json:"name"` - Id string `json:"id"` - Description string `json:"description"` - Version string `json:"version"` - Subject string `json:"subject"` -} - -func (s *ServiceConfig) Valid() error { - if s.Name == "" { - return errors.New("name is required") - } - return s.Endpoint.Valid() -} - -type ServiceSchema struct { - Request string `json:"request"` - Response string `json:"response"` -} - -type Endpoint struct { - Subject string `json:"subject"` - Handler ServiceHandler -} - -type InternalEndpoint struct { - Name string - Handler MsgHandler -} - -func (e *Endpoint) Valid() error { - s := strings.TrimSpace(e.Subject) - if len(s) == 0 { - return errors.New("subject is required") - } - if e.Handler == nil { - return errors.New("handler is required") - } - return nil -} - -type ServiceConfig struct { - Name string `json:"name"` - Id string `json:"id"` - Description string `json:"description"` - Version string `json:"version"` - Schema ServiceSchema `json:"schema"` - Endpoint Endpoint `json:"endpoint"` - StatusHandler func(Endpoint) interface{} -} - -// ServiceApiPrefix is the root of all control subjects -const ServiceApiPrefix = "$SRV" - -type ServiceVerb int64 - -const ( - SrvPing ServiceVerb = iota - SrvStatus - SrvInfo - SrvSchema -) - -func (s ServiceVerb) String() string { - switch s { - case SrvPing: - return "PING" - case SrvStatus: - return "STATUS" - case SrvInfo: - return "INFO" - case SrvSchema: - return "SCHEMA" - default: - return "" - } -} - -// ServiceImpl is the internal implementation of a Service -type ServiceImpl struct { - sync.Mutex - ServiceConfig - // subs - reqSub *Subscription - internal map[string]*Subscription - statuses map[string]*Stats - stats *ServiceStats -} - -// addInternalHandler generates control handlers for a specific verb -// each request generates 3 subscriptions, on for the general verb -// affecting all services written with the framework, one that handles -// all services of a particular kind, and finally a specific service. -func (svc *ServiceImpl) addInternalHandler(nc *Conn, verb ServiceVerb, handler MsgHandler) error { - name := fmt.Sprintf("%s-all", verb.String()) - if err := svc._addInternalHandler(nc, verb, "", "", name, handler); err != nil { - return err - } - name = fmt.Sprintf("%s-kind", verb.String()) - if err := svc._addInternalHandler(nc, verb, svc.Name(), "", name, handler); err != nil { - return err - } - return svc._addInternalHandler(nc, verb, svc.Name(), svc.ID(), verb.String(), handler) -} - -// _addInternalHandler registers a control subject handler -func (svc *ServiceImpl) _addInternalHandler(nc *Conn, verb ServiceVerb, kind string, id string, name string, handler MsgHandler) error { - subj, err := SvcControlSubject(verb, kind, id) - if err != nil { - svc.Stop() - return err - } - - svc.internal[name], err = nc.Subscribe(subj, func(msg *Msg) { - start := time.Now() - defer func() { - svc.Lock() - stats := svc.statuses[name] - stats.NumRequests++ - stats.TotalLatency += time.Since(start) - svc.Unlock() - }() - handler(msg) - }) - if err != nil { - svc.Stop() - return err - } - - svc.statuses[name] = &Stats{ - Name: name, - } - return nil -} - -// AddService adds a microservice. -// NOTE we can do an OpenAPI version as well, but looking at it it was very involved. So I think keep simple version and -// also have a version that talkes full blown OpenAPI spec and we can pull these things out. -func (nc *Conn) AddService(config ServiceConfig) (Service, error) { - if err := config.Valid(); err != nil { - return nil, err - } - - svc := &ServiceImpl{ServiceConfig: config} - svc.internal = make(map[string]*Subscription) - svc.statuses = make(map[string]*Stats) - svc.statuses[""] = &Stats{ - Name: config.Name, - } - - svc.stats = &ServiceStats{ - Name: config.Name, - ID: config.Id, - Version: config.Version, - Started: time.Now(), - } - - // Setup internal subscriptions. - var err error - - svc.reqSub, err = nc.QueueSubscribe(config.Endpoint.Subject, QG, func(m *Msg) { - svc.reqHandler(m) - }) - if err != nil { - return nil, err - } - - info := &ServiceInfo{ - Name: config.Name, - Id: config.Id, - Description: config.Description, - Version: config.Version, - Subject: config.Endpoint.Subject, - } - - infoHandler := func(m *Msg) { - response, _ := json.MarshalIndent(info, "", " ") - m.Respond(response) - } - - pingHandler := func(m *Msg) { - infoHandler(m) - } - - statusHandler := func(m *Msg) { - response, _ := json.MarshalIndent(svc.Stats(), "", " ") - m.Respond(response) - } - - schemaHandler := func(m *Msg) { - response, _ := json.MarshalIndent(svc.ServiceConfig.Schema, "", " ") - m.Respond(response) - } - - if err := svc.addInternalHandler(nc, SrvInfo, infoHandler); err != nil { - return nil, err - } - if err := svc.addInternalHandler(nc, SrvPing, pingHandler); err != nil { - return nil, err - } - if err := svc.addInternalHandler(nc, SrvStatus, statusHandler); err != nil { - return nil, err - } - - if svc.ServiceConfig.Schema.Request != "" || svc.ServiceConfig.Schema.Response != "" { - if err := svc.addInternalHandler(nc, SrvSchema, schemaHandler); err != nil { - return nil, err - } - } - - svc.stats.ID = svc.Id - svc.stats.Started = time.Now() - return svc, nil -} - -// reqHandler itself -func (svc *ServiceImpl) reqHandler(req *Msg) { - start := time.Now() - defer func() { - svc.Lock() - stats := svc.statuses[""] - stats.NumRequests++ - stats.TotalLatency += time.Since(start) - svc.Unlock() - }() - - if err := svc.ServiceConfig.Endpoint.Handler(svc, req); err != nil { - svc.Lock() - stats := svc.statuses[""] - stats.NumErrors++ - svc.Unlock() - req.Sub.mu.Lock() - nc := req.Sub.conn - req.Sub.mu.Unlock() - - hdr := []byte(fmt.Sprintf("NATS/1.0 500 %s\r\n\r\n", err.Error())) - nc.publish(req.Reply, _EMPTY_, hdr, nil) - } -} - -func (svc *ServiceImpl) Stop() { - if svc.reqSub != nil { - svc.reqSub.Drain() - svc.reqSub = nil - } - var keys []string - for key, sub := range svc.internal { - keys = append(keys, key) - sub.Drain() - } - for _, key := range keys { - delete(svc.internal, key) - } -} - -func (svc *ServiceImpl) ID() string { - return svc.ServiceConfig.Id -} - -func (svc *ServiceImpl) Name() string { - return svc.ServiceConfig.Name -} - -func (svc *ServiceImpl) Description() string { - return svc.ServiceConfig.Description -} - -func (svc *ServiceImpl) Version() string { - return svc.ServiceConfig.Version -} - -func (svc *ServiceImpl) Stats() ServiceStats { - svc.Lock() - defer func() { - svc.Unlock() - }() - if svc.ServiceConfig.StatusHandler != nil { - stats := svc.statuses[""] - stats.Data = svc.ServiceConfig.StatusHandler(svc.Endpoint) - } - idx := 0 - v := make([]Stats, len(svc.statuses)) - for _, se := range svc.statuses { - v[idx] = *se - idx++ - } - svc.stats.Endpoints = v - return *svc.stats -} - -func (svc *ServiceImpl) Reset() { - for _, se := range svc.statuses { - se.NumRequests = 0 - se.TotalLatency = 0 - se.NumErrors = 0 - se.Data = nil - } -} - -// SvcControlSubject returns monitoring subjects used by the ServiceImpl -func SvcControlSubject(verb ServiceVerb, kind, id string) (string, error) { - sverb := verb.String() - if sverb == "" { - return "", fmt.Errorf("unsupported ServiceImpl verb") - } - kind = strings.ToUpper(kind) - id = strings.ToUpper(id) - if kind == "" && id == "" { - return fmt.Sprintf("%s.%s", ServiceApiPrefix, sverb), nil - } - if id == "" { - return fmt.Sprintf("%s.%s.%s", ServiceApiPrefix, sverb, kind), nil - } - return fmt.Sprintf("%s.%s.%s.%s", ServiceApiPrefix, sverb, kind, id), nil -} diff --git a/service_test.go b/service_test.go deleted file mode 100644 index 39ffe71d3..000000000 --- a/service_test.go +++ /dev/null @@ -1,145 +0,0 @@ -// Copyright 2022 The NATS Authors -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package nats - -import ( - "encoding/json" - "fmt" - "math/rand" - "testing" - "time" -) - -//////////////////////////////////////////////////////////////////////////////// -// Package scoped specific tests here.. -//////////////////////////////////////////////////////////////////////////////// - -func TestServiceBasics(t *testing.T) { - s := RunServerOnPort(-1) - defer s.Shutdown() - - nc, err := Connect(s.ClientURL()) - if err != nil { - t.Fatalf("Expected to connect to server, got %v", err) - } - defer nc.Close() - - // Stub ServiceImpl. - doAdd := func(svc Service, req *Msg) error { - if rand.Intn(10) == 0 { - return fmt.Errorf("Unexpected Error!") - } - // Happy Path. - // Random delay between 5-10ms - time.Sleep(5*time.Millisecond + time.Duration(rand.Intn(5))*time.Millisecond) - req.Respond([]byte("42")) - return nil - } - - // Create 10 ServiceImpl responders. - - var svcs []Service - - // Create 5 ServiceImpl responders. - config := ServiceConfig{ - Name: "CoolAddService", - Version: "v0.1", - Description: "Add things together", - Endpoint: Endpoint{ - Subject: "svc.add", - Handler: doAdd, - }, - Schema: ServiceSchema{Request: "", Response: ""}, - } - - for i := 0; i < 5; i++ { - config.Id = fmt.Sprintf("%d", i) - svc, err := nc.AddService(config) - if err != nil { - t.Fatalf("Expected to create ServiceImpl, got %v", err) - } - defer svc.Stop() - svcs = append(svcs, svc) - } - - // Now send 50 requests. - for i := 0; i < 50; i++ { - _, err := nc.Request("svc.add", []byte(`{ "x": 22, "y": 11 }`), time.Second) - if err != nil { - t.Fatalf("Expected a response, got %v", err) - } - } - - for _, svc := range svcs { - if svc.Name() != "CoolAddService" { - t.Fatalf("Expected %q, got %q", "CoolAddService", svc.Name()) - } - if len(svc.Description()) == 0 || len(svc.Version()) == 0 { - t.Fatalf("Expected non emoty description and version") - } - } - - // Make sure we can request info, 1 response. - // This could be exported as well as main ServiceImpl. - subj, err := SvcControlSubject(SrvInfo, "CoolAddService", "") - if err != nil { - t.Fatalf("Failed to building info subject %v", err) - } - info, err := nc.Request(subj, nil, time.Second) - if err != nil { - t.Fatalf("Expected a response, got %v", err) - } - inf := ServiceInfo{} - json.Unmarshal(info.Data, &inf) - if inf.Subject != "svc.add" { - t.Fatalf("expected service subject to be srv.add: %s", inf.Subject) - } - fmt.Printf("\ninfo response:\n%s\n\n", info.Data) - - // Get stats for all the nodes. Multiple responses. - // could do STATZ too? - inbox := NewInbox() - sub, err := nc.SubscribeSync(inbox) - if err != nil { - t.Fatalf("subscribe failed: %s", err) - } - if err := nc.PublishRequest("svc.add.PING", inbox, nil); err != nil { - t.Fatalf("Unexpected error: %v", err) - } - - for { - resp, err := sub.NextMsg(250 * time.Millisecond) - if err != nil { - break - } - fmt.Printf("Received ping response: %s\n", resp.Data) - } - - subj, err = SvcControlSubject(SrvStatus, "CoolAddService", "") - if err != nil { - t.Fatalf("unexpected error from stats: %v", err) - } - - r, err := nc.Request(subj, nil, time.Second) - if err != nil { - t.Fatalf("request error: %v", err) - } - status := ServiceStats{} - if err := json.Unmarshal(r.Data, &status); err != nil { - t.Fatalf("unexpected error from stats: %v", err) - } - if len(status.Endpoints) != 10 { - t.Fatal("expected 10 endpoints") - } -} diff --git a/services/errors.go b/services/errors.go new file mode 100644 index 000000000..9ad1c5452 --- /dev/null +++ b/services/errors.go @@ -0,0 +1,25 @@ +// Copyright 2022 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package services + +import "fmt" + +type ServiceAPIError struct { + ErrorCode int + Description string +} + +func (e *ServiceAPIError) Error() string { + return fmt.Sprintf("%d %s", e.ErrorCode, e.Description) +} diff --git a/services/service.go b/services/service.go new file mode 100644 index 000000000..0e4b61508 --- /dev/null +++ b/services/service.go @@ -0,0 +1,404 @@ +// Copyright 2022 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package services + +import ( + "encoding/json" + "errors" + "fmt" + "strings" + "sync" + "time" + + "github.com/nats-io/nats.go" + "github.com/nats-io/nuid" +) + +// Notice: Experimental Preview +// +// This functionality is EXPERIMENTAL and may be changed in later releases. + +type ( + + // Service is an interface for sevice management. + // It exposes methods to stop/reset a service, as well as get information on a service. + Service interface { + ID() string + Name() string + Description() string + Version() string + Stats() ServiceStats + Reset() + Stop() + } + + // A request handler. + // TODO (could make error more and return more info to user automatically?) + ServiceHandler func(svc Service, req *nats.Msg) error + + // Clients can request as well. + ServiceStats struct { + Name string `json:"name"` + ID string `json:"id"` + Version string `json:"version"` + Started time.Time `json:"started"` + Endpoints []Stats `json:"stats"` + } + + Stats struct { + Name string `json:"name"` + NumRequests int `json:"num_requests"` + NumErrors int `json:"num_errors"` + TotalLatency time.Duration `json:"total_latency"` + AverageLatency time.Duration `json:"average_latency"` + Data interface{} `json:"data"` + } + + // ServiceInfo is the basic information about a service type + ServiceInfo struct { + Name string `json:"name"` + ID string `json:"id"` + Description string `json:"description"` + Version string `json:"version"` + Subject string `json:"subject"` + } + + ServiceSchema struct { + Request string `json:"request"` + Response string `json:"response"` + } + + Endpoint struct { + Subject string `json:"subject"` + Handler ServiceHandler + } + + InternalEndpoint struct { + Name string + Handler nats.MsgHandler + } + + ServiceVerb int64 + + ServiceConfig struct { + Name string `json:"name"` + Description string `json:"description"` + Version string `json:"version"` + Schema ServiceSchema `json:"schema"` + Endpoint Endpoint `json:"endpoint"` + StatusHandler func(Endpoint) interface{} + } + + // service is the internal implementation of a Service + service struct { + sync.Mutex + ServiceConfig + id string + // subs + reqSub *nats.Subscription + internal map[string]*nats.Subscription + statuses map[string]*Stats + stats *ServiceStats + conn *nats.Conn + } +) + +const ( + // We can fix this, as versions will be on separate subjects and use account mapping to roll requests to new versions etc. + QG = "svc" + + // ServiceApiPrefix is the root of all control subjects + ServiceApiPrefix = "$SRV" + + ServiceErrorHeader = "Nats-Service-Error" +) + +const ( + SrvPing ServiceVerb = iota + SrvStatus + SrvInfo + SrvSchema +) + +func (s *ServiceConfig) Valid() error { + if s.Name == "" { + return errors.New("name is required") + } + return s.Endpoint.Valid() +} + +func (e *Endpoint) Valid() error { + s := strings.TrimSpace(e.Subject) + if len(s) == 0 { + return errors.New("subject is required") + } + if e.Handler == nil { + return errors.New("handler is required") + } + return nil +} + +func (s ServiceVerb) String() string { + switch s { + case SrvPing: + return "PING" + case SrvStatus: + return "STATUS" + case SrvInfo: + return "INFO" + case SrvSchema: + return "SCHEMA" + default: + return "" + } +} + +// Add adds a microservice. +// NOTE we can do an OpenAPI version as well, but looking at it it was very involved. So I think keep simple version and +// also have a version that talkes full blown OpenAPI spec and we can pull these things out. +func Add(nc *nats.Conn, config ServiceConfig) (Service, error) { + if err := config.Valid(); err != nil { + return nil, err + } + + id := nuid.Next() + svc := &service{ + ServiceConfig: config, + conn: nc, + id: id, + } + svc.internal = make(map[string]*nats.Subscription) + svc.statuses = make(map[string]*Stats) + svc.statuses[""] = &Stats{ + Name: config.Name, + } + + svc.stats = &ServiceStats{ + Name: config.Name, + ID: id, + Version: config.Version, + Started: time.Now(), + } + + // Setup internal subscriptions. + var err error + + svc.reqSub, err = nc.QueueSubscribe(config.Endpoint.Subject, QG, func(m *nats.Msg) { + svc.reqHandler(m) + }) + if err != nil { + return nil, err + } + + info := &ServiceInfo{ + Name: config.Name, + ID: id, + Description: config.Description, + Version: config.Version, + Subject: config.Endpoint.Subject, + } + + infoHandler := func(m *nats.Msg) { + response, _ := json.MarshalIndent(info, "", " ") + m.Respond(response) + } + + pingHandler := func(m *nats.Msg) { + infoHandler(m) + } + + statusHandler := func(m *nats.Msg) { + response, _ := json.MarshalIndent(svc.Stats(), "", " ") + m.Respond(response) + } + + schemaHandler := func(m *nats.Msg) { + response, _ := json.MarshalIndent(svc.ServiceConfig.Schema, "", " ") + m.Respond(response) + } + + if err := svc.addInternalHandlerGroup(nc, SrvInfo, infoHandler); err != nil { + return nil, err + } + if err := svc.addInternalHandlerGroup(nc, SrvPing, pingHandler); err != nil { + return nil, err + } + if err := svc.addInternalHandlerGroup(nc, SrvStatus, statusHandler); err != nil { + return nil, err + } + + if svc.ServiceConfig.Schema.Request != "" || svc.ServiceConfig.Schema.Response != "" { + if err := svc.addInternalHandlerGroup(nc, SrvSchema, schemaHandler); err != nil { + return nil, err + } + } + + svc.stats.ID = id + svc.stats.Started = time.Now() + return svc, nil +} + +// addInternalHandlerGroup generates control handlers for a specific verb +// each request generates 3 subscriptions, one for the general verb +// affecting all services written with the framework, one that handles +// all services of a particular kind, and finally a specific service. +func (svc *service) addInternalHandlerGroup(nc *nats.Conn, verb ServiceVerb, handler nats.MsgHandler) error { + name := fmt.Sprintf("%s-all", verb.String()) + if err := svc.addInternalHandler(nc, verb, "", "", name, handler); err != nil { + return err + } + name = fmt.Sprintf("%s-kind", verb.String()) + if err := svc.addInternalHandler(nc, verb, svc.Name(), "", name, handler); err != nil { + return err + } + return svc.addInternalHandler(nc, verb, svc.Name(), svc.ID(), verb.String(), handler) +} + +// addInternalHandler registers a control subject handler +func (svc *service) addInternalHandler(nc *nats.Conn, verb ServiceVerb, kind, id, name string, handler nats.MsgHandler) error { + subj, err := SvcControlSubject(verb, kind, id) + if err != nil { + svc.Stop() + return err + } + + svc.internal[name], err = nc.Subscribe(subj, func(msg *nats.Msg) { + start := time.Now() + defer func() { + svc.Lock() + stats := svc.statuses[name] + stats.NumRequests++ + stats.TotalLatency += time.Since(start) + stats.AverageLatency = stats.TotalLatency / time.Duration(stats.NumRequests) + svc.Unlock() + }() + handler(msg) + }) + if err != nil { + svc.Stop() + return err + } + + svc.statuses[name] = &Stats{ + Name: name, + } + return nil +} + +// reqHandler itself +func (svc *service) reqHandler(req *nats.Msg) { + start := time.Now() + defer func() { + svc.Lock() + stats := svc.statuses[""] + stats.NumRequests++ + stats.TotalLatency += time.Since(start) + stats.AverageLatency = stats.TotalLatency / time.Duration(stats.NumRequests) + svc.Unlock() + }() + + if err := svc.ServiceConfig.Endpoint.Handler(svc, req); err != nil { + hdr := make(nats.Header) + apiErr := &ServiceAPIError{} + if ok := errors.As(err, &apiErr); !ok { + hdr[ServiceErrorHeader] = []string{fmt.Sprintf("%d %s", 500, err.Error())} + } else { + hdr[ServiceErrorHeader] = []string{apiErr.Error()} + } + svc.Lock() + stats := svc.statuses[""] + stats.NumErrors++ + svc.Unlock() + + svc.conn.PublishMsg(&nats.Msg{ + Subject: req.Reply, + Header: hdr, + }) + } +} + +func (svc *service) Stop() { + if svc.reqSub != nil { + svc.reqSub.Drain() + svc.reqSub = nil + } + var keys []string + for key, sub := range svc.internal { + keys = append(keys, key) + sub.Drain() + } + for _, key := range keys { + delete(svc.internal, key) + } +} + +func (svc *service) ID() string { + return svc.id +} + +func (svc *service) Name() string { + return svc.ServiceConfig.Name +} + +func (svc *service) Description() string { + return svc.ServiceConfig.Description +} + +func (svc *service) Version() string { + return svc.ServiceConfig.Version +} + +func (svc *service) Stats() ServiceStats { + svc.Lock() + defer func() { + svc.Unlock() + }() + if svc.ServiceConfig.StatusHandler != nil { + stats := svc.statuses[""] + stats.Data = svc.ServiceConfig.StatusHandler(svc.Endpoint) + } + idx := 0 + v := make([]Stats, len(svc.statuses)) + for _, se := range svc.statuses { + v[idx] = *se + idx++ + } + svc.stats.Endpoints = v + return *svc.stats +} + +func (svc *service) Reset() { + for _, se := range svc.statuses { + se.NumRequests = 0 + se.TotalLatency = 0 + se.NumErrors = 0 + se.Data = nil + } +} + +// SvcControlSubject returns monitoring subjects used by the ServiceImpl +func SvcControlSubject(verb ServiceVerb, kind, id string) (string, error) { + sverb := verb.String() + if sverb == "" { + return "", fmt.Errorf("unsupported service verb") + } + kind = strings.ToUpper(kind) + if kind == "" && id == "" { + return fmt.Sprintf("%s.%s", ServiceApiPrefix, sverb), nil + } + if id == "" { + return fmt.Sprintf("%s.%s.%s", ServiceApiPrefix, sverb, kind), nil + } + return fmt.Sprintf("%s.%s.%s.%s", ServiceApiPrefix, sverb, kind, id), nil +} diff --git a/services/service_test.go b/services/service_test.go new file mode 100644 index 000000000..532be8915 --- /dev/null +++ b/services/service_test.go @@ -0,0 +1,260 @@ +// Copyright 2022 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package services + +import ( + "encoding/json" + "fmt" + "math/rand" + "testing" + "time" + + "github.com/nats-io/nats-server/v2/server" + natsserver "github.com/nats-io/nats-server/v2/test" + "github.com/nats-io/nats.go" +) + +func TestServiceBasics(t *testing.T) { + s := RunServerOnPort(-1) + defer s.Shutdown() + + nc, err := nats.Connect(s.ClientURL()) + if err != nil { + t.Fatalf("Expected to connect to server, got %v", err) + } + defer nc.Close() + + // Stub service. + doAdd := func(svc Service, req *nats.Msg) error { + if rand.Intn(10) == 0 { + return fmt.Errorf("Unexpected Error!") + } + // Happy Path. + // Random delay between 5-10ms + time.Sleep(5*time.Millisecond + time.Duration(rand.Intn(5))*time.Millisecond) + if err := req.Respond([]byte("42")); err != nil { + return err + } + return nil + } + + var svcs []Service + + // Create 5 service responders. + config := ServiceConfig{ + Name: "CoolAddService", + Version: "v0.1", + Description: "Add things together", + Endpoint: Endpoint{ + Subject: "svc.add", + Handler: doAdd, + }, + Schema: ServiceSchema{Request: "", Response: ""}, + } + + for i := 0; i < 5; i++ { + svc, err := Add(nc, config) + if err != nil { + t.Fatalf("Expected to create Service, got %v", err) + } + defer svc.Stop() + svcs = append(svcs, svc) + } + + // Now send 50 requests. + for i := 0; i < 50; i++ { + _, err := nc.Request("svc.add", []byte(`{ "x": 22, "y": 11 }`), time.Second) + if err != nil { + t.Fatalf("Expected a response, got %v", err) + } + } + + for _, svc := range svcs { + if svc.Name() != "CoolAddService" { + t.Fatalf("Expected %q, got %q", "CoolAddService", svc.Name()) + } + if len(svc.Description()) == 0 || len(svc.Version()) == 0 { + t.Fatalf("Expected non empty description and version") + } + } + + // Make sure we can request info, 1 response. + // This could be exported as well as main ServiceImpl. + subj, err := SvcControlSubject(SrvInfo, "CoolAddService", "") + if err != nil { + t.Fatalf("Failed to building info subject %v", err) + } + info, err := nc.Request(subj, nil, time.Second) + if err != nil { + t.Fatalf("Expected a response, got %v", err) + } + var inf ServiceInfo + if err := json.Unmarshal(info.Data, &inf); err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if inf.Subject != "svc.add" { + t.Fatalf("expected service subject to be srv.add: %s", inf.Subject) + } + + // Ping all services. Multiple responses. + // could do STATZ too? + inbox := nats.NewInbox() + sub, err := nc.SubscribeSync(inbox) + if err != nil { + t.Fatalf("subscribe failed: %s", err) + } + pingSubject, err := SvcControlSubject(SrvPing, "CoolAddService", "") + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if err := nc.PublishRequest(pingSubject, inbox, nil); err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + var pingCount int + for { + _, err := sub.NextMsg(250 * time.Millisecond) + if err != nil { + break + } + pingCount++ + } + if pingCount != 5 { + t.Fatalf("Expected 5 ping responses, got: %d", pingCount) + } + + // Get stats from all services + statsInbox := nats.NewInbox() + sub, err = nc.SubscribeSync(statsInbox) + if err != nil { + t.Fatalf("subscribe failed: %s", err) + } + statsSubject, err := SvcControlSubject(SrvStatus, "CoolAddService", "") + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if err := nc.PublishRequest(statsSubject, statsInbox, nil); err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + stats := make([]ServiceStats, 0) + var requestsNum int + for { + resp, err := sub.NextMsg(250 * time.Millisecond) + if err != nil { + break + } + var srvStats ServiceStats + if err := json.Unmarshal(resp.Data, &srvStats); err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if len(srvStats.Endpoints) != 10 { + t.Fatalf("Expected 10 endpoints on a serivce, got: %d", len(srvStats.Endpoints)) + } + for _, e := range srvStats.Endpoints { + if e.Name == "CoolAddService" { + requestsNum += e.NumRequests + } + } + stats = append(stats, srvStats) + } + if len(stats) != 5 { + t.Fatalf("Expected stats for 5 services, got: %d", len(stats)) + } + + // Services should process 50 requests total + if requestsNum != 50 { + t.Fatalf("Expected a total fo 50 requests processed, got: %d", requestsNum) + } +} + +func TestServiceErrors(t *testing.T) { + tests := []struct { + name string + handlerResponse error + expectedStatus string + }{ + { + name: "generic error", + handlerResponse: fmt.Errorf("oops"), + expectedStatus: "500 oops", + }, + { + name: "api error", + handlerResponse: &ServiceAPIError{ErrorCode: 400, Description: "oops"}, + expectedStatus: "400 oops", + }, + { + name: "no error", + handlerResponse: nil, + expectedStatus: "", + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + s := RunServerOnPort(-1) + defer s.Shutdown() + + nc, err := nats.Connect(s.ClientURL()) + if err != nil { + t.Fatalf("Expected to connect to server, got %v", err) + } + defer nc.Close() + + // Stub service. + handler := func(svc Service, req *nats.Msg) error { + if test.handlerResponse == nil { + if err := req.Respond([]byte("ok")); err != nil { + return err + } + } + return test.handlerResponse + } + + svc, err := Add(nc, ServiceConfig{ + Name: "CoolService", + Description: "Erroring service", + Endpoint: Endpoint{ + Subject: "svc.fail", + Handler: handler, + }, + }) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + defer svc.Stop() + + resp, err := nc.Request("svc.fail", nil, 1*time.Second) + if err != nil { + t.Fatalf("request error") + } + + status := resp.Header.Get("Nats-Service-Error") + if status != test.expectedStatus { + t.Fatalf("Invalid response status; want: %q; got: %q", test.expectedStatus, status) + } + }) + } +} + +func RunServerOnPort(port int) *server.Server { + opts := natsserver.DefaultTestOptions + opts.Port = port + return RunServerWithOptions(&opts) +} + +func RunServerWithOptions(opts *server.Options) *server.Server { + return natsserver.RunServer(opts) +}