From caedfdf72c6ab7bb1047499de9cd62135bc736be Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Thu, 15 Sep 2022 17:57:29 -0700 Subject: [PATCH 1/6] Services WIP Signed-off-by: Derek Collison --- service.go | 182 ++++++++++++++++++++++++++++++++++++++++++++++++ service_test.go | 114 ++++++++++++++++++++++++++++++ 2 files changed, 296 insertions(+) create mode 100644 service.go create mode 100644 service_test.go diff --git a/service.go b/service.go new file mode 100644 index 000000000..0335325c0 --- /dev/null +++ b/service.go @@ -0,0 +1,182 @@ +// Copyright 2022 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package nats + +import ( + "encoding/json" + "fmt" + "time" + + "github.com/nats-io/nuid" +) + +type Service interface { + ID() string + Name() string + Description() string + Version() string + + // Stats + Stats() Stats + Reset() + + Close() +} + +// A request handler. +// TODO (could make error more and return more info to user automatically?) +type RequestHandler func(svc Service, req *Msg) error + +// Clients can request as well. +type Stats struct { + ID string + Started time.Time + NumRequests int + NumErrors int + TotalLatency time.Duration +} + +// We can fix this, as versions will be on separate subjects and use account mapping to roll requests to new versions etc. +const QG = "svc" + +// When a request for info comes in we return json of this. +type ServiceInfo struct { + Name string + Description string + Version string +} + +// Internal struct +type service struct { + id string + name string + description string + version string + stats Stats + // subs + reqSub *Subscription + // info + infoSub *Subscription + // stats + pingSub *Subscription + + cb RequestHandler +} + +// Add a microservice. +// NOTE we can do an OpenAPI version as well, but looking at it it was very involved. So I think keep simple version and +// also have a version that talkes full blown OpenAPI spec and we can pull these things out. +func (nc *Conn) AddService(name, description, version, subject, reqSchema, respSchema string, reqHandler RequestHandler) (Service, error) { + + // NOTE WIP + svc := &service{id: nuid.Next(), name: name, description: description, version: version, cb: reqHandler} + + // Setup internal subscriptions. + var err error + + svc.reqSub, err = nc.QueueSubscribe(subject, QG, func(m *Msg) { svc.reqHandler(m) }) + if err != nil { + return nil, err + } + + // Construct info sub from main subject. + info := fmt.Sprintf("%s.INFO", subject) + svc.infoSub, err = nc.QueueSubscribe(info, QG, func(m *Msg) { + si := &ServiceInfo{ + Name: svc.name, + Description: svc.description, + Version: svc.version, + } + response, _ := json.MarshalIndent(si, "", " ") + m.Respond(response) + }) + if err != nil { + svc.Close() + return nil, err + } + + // Construct ping sub from main subject. + // These will be responded by all handlers. + ping := fmt.Sprintf("%s.PING", subject) + svc.infoSub, err = nc.Subscribe(ping, func(m *Msg) { + response, _ := json.MarshalIndent(svc.stats, "", " ") + m.Respond(response) + }) + if err != nil { + svc.Close() + return nil, err + } + + svc.stats.ID = svc.id + svc.stats.Started = time.Now() + return svc, nil +} + +// reqHandler itself +func (svc *service) reqHandler(req *Msg) { + start := time.Now() + defer func() { + svc.stats.NumRequests++ + svc.stats.TotalLatency += time.Since(start) + }() + + if err := svc.cb(svc, req); err != nil { + svc.stats.NumErrors++ + req.Sub.mu.Lock() + nc := req.Sub.conn + req.Sub.mu.Unlock() + + hdr := []byte(fmt.Sprintf("NATS/1.0 500 %s\r\n\r\n", err.Error())) + nc.publish(req.Reply, _EMPTY_, hdr, nil) + } +} + +func (svc *service) Close() { + if svc.reqSub != nil { + svc.reqSub.Drain() + svc.reqSub = nil + } + if svc.infoSub != nil { + svc.infoSub.Drain() + svc.infoSub = nil + } + if svc.pingSub != nil { + svc.pingSub.Drain() + svc.pingSub = nil + } +} + +func (svc *service) ID() string { + return svc.id +} + +func (svc *service) Name() string { + return svc.name +} + +func (svc *service) Description() string { + return svc.description +} + +func (svc *service) Version() string { + return svc.version +} + +func (svc *service) Stats() Stats { + return svc.stats +} + +func (svc *service) Reset() { + svc.stats = Stats{} +} diff --git a/service_test.go b/service_test.go new file mode 100644 index 000000000..4295e4d53 --- /dev/null +++ b/service_test.go @@ -0,0 +1,114 @@ +// Copyright 2022 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package nats + +import ( + "fmt" + "math/rand" + "testing" + "time" +) + +//////////////////////////////////////////////////////////////////////////////// +// Package scoped specific tests here.. +//////////////////////////////////////////////////////////////////////////////// + +func TestServiceBasics(t *testing.T) { + s := RunServerOnPort(-1) + defer s.Shutdown() + + nc, err := Connect(s.ClientURL()) + if err != nil { + t.Fatalf("Expected to connect to server, got %v", err) + } + defer nc.Close() + + // Stub service. + doAdd := func(svc Service, req *Msg) error { + if rand.Intn(10) == 0 { + return fmt.Errorf("Unexpected Error!") + } + // Happy Path. + // Random delay between 5-10ms + time.Sleep(5*time.Millisecond + time.Duration(rand.Intn(5))*time.Millisecond) + req.Respond([]byte("42")) + return nil + } + + // Create 10 service responders. + + var svcs []Service + + // Create 5 service responders. + for i := 0; i < 5; i++ { + svc, err := nc.AddService( + "CoolAddService", + "Add things together", + "v0.1", + "svc.add", + _EMPTY_, // TBD - request schema + _EMPTY_, // TBD - response schema + doAdd, + ) + if err != nil { + t.Fatalf("Expected to create service, got %v", err) + } + defer svc.Close() + svcs = append(svcs, svc) + } + + // Now send 50 requests. + for i := 0; i < 50; i++ { + _, err := nc.Request("svc.add", []byte(`{ "x": 22, "y": 11 }`), time.Second) + if err != nil { + t.Fatalf("Expected a response, got %v", err) + } + } + + for _, svc := range svcs { + if svc.Name() != "CoolAddService" { + t.Fatalf("Expected %q, got %q", "CoolAddService", svc.Name()) + } + if len(svc.Description()) == 0 || len(svc.Version()) == 0 { + t.Fatalf("Expected non emoty description and version") + } + } + + // Make sure we can request info, 1 response. + // This could be exported as well as main service. + info, err := nc.Request("svc.add.INFO", nil, time.Second) + if err != nil { + t.Fatalf("Expected a response, got %v", err) + } + fmt.Printf("\ninfo response:\n%s\n\n", info.Data) + + // Get stats for all the nodes. Multiple responses. + // could do STATZ too? + inbox := NewInbox() + sub, err := nc.SubscribeSync(inbox) + if err != nil { + t.Fatalf("subscribe failed: %s", err) + } + if err := nc.PublishRequest("svc.add.PING", inbox, nil); err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + for { + resp, err := sub.NextMsg(250 * time.Millisecond) + if err != nil { + break + } + fmt.Printf("Received ping response: %s\n", resp.Data) + } +} From b1007e79d8e4da73b16fb48f9750251a75c56969 Mon Sep 17 00:00:00 2001 From: Alberto Ricart Date: Wed, 19 Oct 2022 17:13:01 -0500 Subject: [PATCH 2/6] [WIP] align services in go to javascript poc --- service.go | 335 ++++++++++++++++++++++++++++++++++++------------ service_test.go | 57 +++++--- 2 files changed, 293 insertions(+), 99 deletions(-) diff --git a/service.go b/service.go index 0335325c0..64e941460 100644 --- a/service.go +++ b/service.go @@ -15,10 +15,10 @@ package nats import ( "encoding/json" + "errors" "fmt" + "strings" "time" - - "github.com/nats-io/nuid" ) type Service interface { @@ -26,113 +26,249 @@ type Service interface { Name() string Description() string Version() string - - // Stats - Stats() Stats + Stats() ServiceStats Reset() - - Close() + Stop() } // A request handler. // TODO (could make error more and return more info to user automatically?) -type RequestHandler func(svc Service, req *Msg) error +type ServiceHandler func(svc Service, req *Msg) error // Clients can request as well. +type ServiceStats struct { + Name string `json:"name"` + ID string `json:"id"` + Version string `json:"version"` + Started time.Time `json:"started"` + Endpoints []Stats `json:"stats"` +} type Stats struct { - ID string - Started time.Time - NumRequests int - NumErrors int - TotalLatency time.Duration + Name string `json:"name"` + NumRequests int `json:"numRequests"` + NumErrors int `json:"numErrors"` + TotalLatency time.Duration `json:"totalLatency"` + Data interface{} `json:"data"` } // We can fix this, as versions will be on separate subjects and use account mapping to roll requests to new versions etc. const QG = "svc" -// When a request for info comes in we return json of this. +// ServiceInfo is the basic information about a service type type ServiceInfo struct { - Name string - Description string - Version string -} - -// Internal struct -type service struct { - id string - name string - description string - version string - stats Stats + Name string `json:"name"` + Id string `json:"id"` + Description string `json:"description"` + Version string `json:"version"` +} + +func (s *ServiceInfo) Valid() error { + if s.Name == "" { + return errors.New("name is required") + } + return nil +} + +type ServiceSchema struct { + Request string `json:"request"` + Response string `json:"response"` +} + +type Endpoint struct { + Subject string `json:"subject"` + Handler ServiceHandler +} + +type InternalEndpoint struct { + Name string + Handler MsgHandler +} + +func (e *Endpoint) Valid() error { + s := strings.TrimSpace(e.Subject) + if len(s) == 0 { + return errors.New("subject is required") + } + if e.Handler == nil { + return errors.New("handler is required") + } + return nil +} + +type ServiceConfig struct { + ServiceInfo + Schema ServiceSchema `json:"schema"` + Endpoint Endpoint `json:"endpoint"` + StatusHandler func(Endpoint) interface{} +} + +// ServiceApiPrefix is the root of all control subjects +const ServiceApiPrefix = "$SRV" + +type ServiceVerb int64 + +const ( + SrvPing ServiceVerb = iota + SrvStatus + SrvInfo + SrvSchema +) + +func (s ServiceVerb) String() string { + switch s { + case SrvPing: + return "PING" + case SrvStatus: + return "STATUS" + case SrvInfo: + return "INFO" + case SrvSchema: + return "SCHEMA" + default: + return "" + } +} + +// ServiceImpl is the internal implementation of a Service +type ServiceImpl struct { + ServiceConfig // subs - reqSub *Subscription - // info - infoSub *Subscription - // stats - pingSub *Subscription + reqSub *Subscription + internal map[string]*Subscription + statuses map[string]*Stats + stats *ServiceStats +} - cb RequestHandler +// addInternalHandler generates control handlers for a specific verb +// each request generates 3 subscriptions, on for the general verb +// affecting all services written with the framework, one that handles +// all services of a particular kind, and finally a specific service. +func (svc *ServiceImpl) addInternalHandler(nc *Conn, verb ServiceVerb, handler MsgHandler) error { + name := fmt.Sprintf("%s-all", verb.String()) + if err := svc._addInternalHandler(nc, verb, "", "", name, handler); err != nil { + return err + } + name = fmt.Sprintf("%s-kind", verb.String()) + if err := svc._addInternalHandler(nc, verb, svc.Name(), "", name, handler); err != nil { + return err + } + return svc._addInternalHandler(nc, verb, svc.Name(), svc.ID(), verb.String(), handler) } -// Add a microservice. +// _addInternalHandler registers a control subject handler +func (svc *ServiceImpl) _addInternalHandler(nc *Conn, verb ServiceVerb, kind string, id string, name string, handler MsgHandler) error { + subj, err := SvcControlSubject(verb, kind, id) + if err != nil { + svc.Stop() + return err + } + + svc.internal[name], err = nc.Subscribe(subj, func(msg *Msg) { + start := time.Now() + stats := svc.statuses[name] + defer func() { + stats.NumRequests++ + stats.TotalLatency += time.Since(start) + }() + handler(msg) + }) + if err != nil { + svc.Stop() + return err + } + + svc.statuses[name] = &Stats{ + Name: name, + } + return nil +} + +// AddService adds a microservice. // NOTE we can do an OpenAPI version as well, but looking at it it was very involved. So I think keep simple version and // also have a version that talkes full blown OpenAPI spec and we can pull these things out. -func (nc *Conn) AddService(name, description, version, subject, reqSchema, respSchema string, reqHandler RequestHandler) (Service, error) { +func (nc *Conn) AddService(config ServiceConfig) (Service, error) { + if err := config.ServiceInfo.Valid(); err != nil { + return nil, err + } + if err := config.Endpoint.Valid(); err != nil { + return nil, err + } - // NOTE WIP - svc := &service{id: nuid.Next(), name: name, description: description, version: version, cb: reqHandler} + svc := &ServiceImpl{ServiceConfig: config} + svc.internal = make(map[string]*Subscription) + svc.statuses = make(map[string]*Stats) + svc.statuses[""] = &Stats{ + Name: config.Name, + } + + svc.stats = &ServiceStats{ + Name: config.Name, + ID: config.Id, + Version: config.Version, + Started: time.Now(), + } // Setup internal subscriptions. var err error - svc.reqSub, err = nc.QueueSubscribe(subject, QG, func(m *Msg) { svc.reqHandler(m) }) + svc.reqSub, err = nc.QueueSubscribe(config.Endpoint.Subject, QG, func(m *Msg) { + svc.reqHandler(m) + }) if err != nil { return nil, err } - // Construct info sub from main subject. - info := fmt.Sprintf("%s.INFO", subject) - svc.infoSub, err = nc.QueueSubscribe(info, QG, func(m *Msg) { - si := &ServiceInfo{ - Name: svc.name, - Description: svc.description, - Version: svc.version, - } - response, _ := json.MarshalIndent(si, "", " ") + infoHandler := func(m *Msg) { + response, _ := json.MarshalIndent(config.ServiceInfo, "", " ") m.Respond(response) - }) - if err != nil { - svc.Close() - return nil, err } - // Construct ping sub from main subject. - // These will be responded by all handlers. - ping := fmt.Sprintf("%s.PING", subject) - svc.infoSub, err = nc.Subscribe(ping, func(m *Msg) { - response, _ := json.MarshalIndent(svc.stats, "", " ") + pingHandler := func(m *Msg) { + infoHandler(m) + } + + statusHandler := func(m *Msg) { + response, _ := json.MarshalIndent(svc.Stats(), "", " ") m.Respond(response) - }) - if err != nil { - svc.Close() + } + + schemaHandler := func(m *Msg) { + response, _ := json.MarshalIndent(svc.ServiceConfig.Schema, "", " ") + m.Respond(response) + } + + if err := svc.addInternalHandler(nc, SrvInfo, infoHandler); err != nil { + return nil, err + } + if err := svc.addInternalHandler(nc, SrvPing, pingHandler); err != nil { + return nil, err + } + if err := svc.addInternalHandler(nc, SrvStatus, statusHandler); err != nil { return nil, err } - svc.stats.ID = svc.id + if svc.ServiceConfig.Schema.Request != "" || svc.ServiceConfig.Schema.Response != "" { + if err := svc.addInternalHandler(nc, SrvSchema, schemaHandler); err != nil { + return nil, err + } + } + + svc.stats.ID = svc.ServiceInfo.Id svc.stats.Started = time.Now() return svc, nil } // reqHandler itself -func (svc *service) reqHandler(req *Msg) { +func (svc *ServiceImpl) reqHandler(req *Msg) { start := time.Now() + stats := svc.statuses[""] defer func() { - svc.stats.NumRequests++ - svc.stats.TotalLatency += time.Since(start) + stats.NumRequests++ + stats.TotalLatency += time.Since(start) }() - if err := svc.cb(svc, req); err != nil { - svc.stats.NumErrors++ + if err := svc.ServiceConfig.Endpoint.Handler(svc, req); err != nil { + stats.NumErrors++ req.Sub.mu.Lock() nc := req.Sub.conn req.Sub.mu.Unlock() @@ -142,41 +278,74 @@ func (svc *service) reqHandler(req *Msg) { } } -func (svc *service) Close() { +func (svc *ServiceImpl) Stop() { if svc.reqSub != nil { svc.reqSub.Drain() svc.reqSub = nil } - if svc.infoSub != nil { - svc.infoSub.Drain() - svc.infoSub = nil + var keys []string + for key, sub := range svc.internal { + keys = append(keys, key) + sub.Drain() } - if svc.pingSub != nil { - svc.pingSub.Drain() - svc.pingSub = nil + for _, key := range keys { + delete(svc.internal, key) } } -func (svc *service) ID() string { - return svc.id +func (svc *ServiceImpl) ID() string { + return svc.ServiceConfig.Id } -func (svc *service) Name() string { - return svc.name +func (svc *ServiceImpl) Name() string { + return svc.ServiceConfig.Name } -func (svc *service) Description() string { - return svc.description +func (svc *ServiceImpl) Description() string { + return svc.ServiceConfig.Description } -func (svc *service) Version() string { - return svc.version +func (svc *ServiceImpl) Version() string { + return svc.ServiceConfig.Version } -func (svc *service) Stats() Stats { - return svc.stats +func (svc *ServiceImpl) Stats() ServiceStats { + if svc.ServiceConfig.StatusHandler != nil { + stats := svc.statuses[""] + stats.Data = svc.ServiceConfig.StatusHandler(svc.Endpoint) + } + idx := 0 + v := make([]Stats, len(svc.statuses)) + for _, se := range svc.statuses { + v[idx] = *se + idx++ + } + svc.stats.Endpoints = v + return *svc.stats } -func (svc *service) Reset() { - svc.stats = Stats{} +func (svc *ServiceImpl) Reset() { + for _, se := range svc.statuses { + se.NumRequests = 0 + se.TotalLatency = 0 + se.NumErrors = 0 + se.Data = nil + } +} + +// SvcControlSubject returns monitoring subjects used by the ServiceImpl +func SvcControlSubject(verb ServiceVerb, kind, id string) (string, error) { + sverb := verb.String() + if sverb == "" { + return "", fmt.Errorf("unsupported ServiceImpl verb") + } + kind = strings.ToUpper(kind) + id = strings.ToUpper(id) + if kind == "" && id == "" { + return fmt.Sprintf("%s.%s", ServiceApiPrefix, sverb), nil + } + if id == "" { + return fmt.Sprintf("%s.%s.%s", ServiceApiPrefix, sverb, kind), nil + } + return fmt.Sprintf("%s.%s.%s.%s", ServiceApiPrefix, sverb, kind, id), nil } diff --git a/service_test.go b/service_test.go index 4295e4d53..bd8ad263f 100644 --- a/service_test.go +++ b/service_test.go @@ -14,6 +14,7 @@ package nats import ( + "encoding/json" "fmt" "math/rand" "testing" @@ -34,7 +35,7 @@ func TestServiceBasics(t *testing.T) { } defer nc.Close() - // Stub service. + // Stub ServiceImpl. doAdd := func(svc Service, req *Msg) error { if rand.Intn(10) == 0 { return fmt.Errorf("Unexpected Error!") @@ -46,25 +47,31 @@ func TestServiceBasics(t *testing.T) { return nil } - // Create 10 service responders. + // Create 10 ServiceImpl responders. var svcs []Service - // Create 5 service responders. + // Create 5 ServiceImpl responders. + config := ServiceConfig{ + ServiceInfo: ServiceInfo{ + Name: "CoolAddService", + Version: "v0.1", + Description: "Add things together", + }, + Endpoint: Endpoint{ + Subject: "svc.add", + Handler: doAdd, + }, + Schema: ServiceSchema{Request: "", Response: ""}, + } + for i := 0; i < 5; i++ { - svc, err := nc.AddService( - "CoolAddService", - "Add things together", - "v0.1", - "svc.add", - _EMPTY_, // TBD - request schema - _EMPTY_, // TBD - response schema - doAdd, - ) + config.Id = fmt.Sprintf("%d", i) + svc, err := nc.AddService(config) if err != nil { - t.Fatalf("Expected to create service, got %v", err) + t.Fatalf("Expected to create ServiceImpl, got %v", err) } - defer svc.Close() + defer svc.Stop() svcs = append(svcs, svc) } @@ -86,8 +93,12 @@ func TestServiceBasics(t *testing.T) { } // Make sure we can request info, 1 response. - // This could be exported as well as main service. - info, err := nc.Request("svc.add.INFO", nil, time.Second) + // This could be exported as well as main ServiceImpl. + subj, err := SvcControlSubject(SrvInfo, "CoolAddService", "") + if err != nil { + t.Fatalf("Failed to building info subject %v", err) + } + info, err := nc.Request(subj, nil, time.Second) if err != nil { t.Fatalf("Expected a response, got %v", err) } @@ -111,4 +122,18 @@ func TestServiceBasics(t *testing.T) { } fmt.Printf("Received ping response: %s\n", resp.Data) } + + subj, err = SvcControlSubject(SrvStatus, "CoolAddService", "") + if err != nil { + t.Fatalf("unexpected error from stats: %v", err) + } + + r, err := nc.Request(subj, nil, time.Second) + status := ServiceStats{} + if err := json.Unmarshal(r.Data, &status); err != nil { + t.Fatalf("unexpected error from stats: %v", err) + } + if len(status.Endpoints) != 10 { + t.Fatal("expected 10 endpoints") + } } From 680718feda59c5f6d168c26d3d1085f0fc656c6c Mon Sep 17 00:00:00 2001 From: Alberto Ricart Date: Wed, 19 Oct 2022 17:20:06 -0500 Subject: [PATCH 3/6] lint --- go.mod | 3 +++ go.sum | 57 +++++++++++++++++++++++++++++++++++++++++++++++++ service_test.go | 3 +++ 3 files changed, 63 insertions(+) diff --git a/go.mod b/go.mod index 63faedf43..9bc1fbebf 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,9 @@ module github.com/nats-io/nats.go go 1.16 require ( + github.com/golang/protobuf v1.5.0 + github.com/nats-io/nats-server/v2 v2.9.3 // indirect github.com/nats-io/nkeys v0.3.0 github.com/nats-io/nuid v1.0.1 + google.golang.org/protobuf v1.28.1 ) diff --git a/go.sum b/go.sum index 2138ffb56..0a05b0b51 100644 --- a/go.sum +++ b/go.sum @@ -1,11 +1,68 @@ +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8= +github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA= +github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs= +github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:WU3c8KckQ9AFe+yFwt9sWVRKCVIyN9cPHBJSNnbL67w= +github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0= +github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= +github.com/golang/protobuf v1.5.0 h1:LUVKkCeviFUMKqHa4tXIIij/lbhnMbP7Fn5wKdKkRh4= +github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= +github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= +github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= +github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/klauspost/compress v1.15.11 h1:Lcadnb3RKGin4FYM/orgq0qde+nc15E5Cbqg4B9Sx9c= +github.com/klauspost/compress v1.15.11/go.mod h1:QPwzmACJjUTFsnSHH934V6woptycfrDDJnH7hvFVbGM= +github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= +github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= +github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA/g= +github.com/minio/highwayhash v1.0.2/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY= +github.com/nats-io/jwt/v2 v2.3.0 h1:z2mA1a7tIf5ShggOFlR1oBPgd6hGqcDYsISxZByUzdI= +github.com/nats-io/jwt/v2 v2.3.0/go.mod h1:0tqz9Hlu6bCBFLWAASKhE5vUA4c24L9KPUUgvwumE/k= +github.com/nats-io/nats-server/v2 v2.9.3 h1:HrfzA7G9LNetKkm1z+jU/e9kuAe+E6uaBuuq9EB5sQQ= +github.com/nats-io/nats-server/v2 v2.9.3/go.mod h1:4sq8wvrpbvSzL1n3ZfEYnH4qeUuIl5W990j3kw13rRk= +github.com/nats-io/nats.go v1.17.0/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w= github.com/nats-io/nkeys v0.3.0 h1:cgM5tL53EvYRU+2YLXIK0G2mJtK12Ft9oeooSZMA2G8= github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4= github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/prashantv/gostub v1.1.0/go.mod h1:A5zLQHz7ieHGG7is6LLXLz7I8+3LZzsrV0P1IAHhP5U= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +go.uber.org/automaxprocs v1.5.1/go.mod h1:BF4eumQw0P9GtnuxxovUd06vwm1o18oMzFtK66vU6XU= golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b h1:wSOdpTq0/eI46Ez/LkDwIsAKA71YP2SRKBODiRWM0as= golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4= +golang.org/x/crypto v0.0.0-20220926161630-eccd6366d1be h1:fmw3UbQh+nxngCAHrDCCztao/kbYFnWjoqop8dHx05A= +golang.org/x/crypto v0.0.0-20220926161630-eccd6366d1be/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= +golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= +golang.org/x/sys v0.0.0-20190130150945-aca44879d564/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220928140112-f11e5e49a4ec h1:BkDtF2Ih9xZ7le9ndzTA7KJow28VbQW3odyk/8drmuI= +golang.org/x/sys v0.0.0-20220928140112-f11e5e49a4ec/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/time v0.0.0-20220922220347-f3bd1da661af h1:Yx9k8YCG3dvF87UAn2tu2HQLf2dt/eR1bXxpLMWeH+Y= +golang.org/x/time v0.0.0-20220922220347-f3bd1da661af/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= +google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= +google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM= +google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod h1:A+miEFZTKqfCUM6K7xSMQL9OKL/b6hQv+e19PK+JZNE= +google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo= +google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= +google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= +google.golang.org/protobuf v1.28.1 h1:d0NfwRgPtno5B1Wa6L2DAG+KivqkdutMf1UhdNx175w= +google.golang.org/protobuf v1.28.1/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/service_test.go b/service_test.go index bd8ad263f..3b6fba682 100644 --- a/service_test.go +++ b/service_test.go @@ -129,6 +129,9 @@ func TestServiceBasics(t *testing.T) { } r, err := nc.Request(subj, nil, time.Second) + if err != nil { + t.Fatalf("request error: %v", err) + } status := ServiceStats{} if err := json.Unmarshal(r.Data, &status); err != nil { t.Fatalf("unexpected error from stats: %v", err) From bba377a37645d7606a157e58227e366b15b26394 Mon Sep 17 00:00:00 2001 From: Alberto Ricart Date: Wed, 19 Oct 2022 17:41:44 -0500 Subject: [PATCH 4/6] fixed data race --- service.go | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/service.go b/service.go index 64e941460..5d4f77e91 100644 --- a/service.go +++ b/service.go @@ -18,6 +18,7 @@ import ( "errors" "fmt" "strings" + "sync" "time" ) @@ -131,6 +132,7 @@ func (s ServiceVerb) String() string { // ServiceImpl is the internal implementation of a Service type ServiceImpl struct { + sync.Mutex ServiceConfig // subs reqSub *Subscription @@ -165,10 +167,12 @@ func (svc *ServiceImpl) _addInternalHandler(nc *Conn, verb ServiceVerb, kind str svc.internal[name], err = nc.Subscribe(subj, func(msg *Msg) { start := time.Now() - stats := svc.statuses[name] defer func() { + svc.Lock() + stats := svc.statuses[name] stats.NumRequests++ stats.TotalLatency += time.Since(start) + svc.Unlock() }() handler(msg) }) @@ -261,14 +265,19 @@ func (nc *Conn) AddService(config ServiceConfig) (Service, error) { // reqHandler itself func (svc *ServiceImpl) reqHandler(req *Msg) { start := time.Now() - stats := svc.statuses[""] defer func() { + svc.Lock() + stats := svc.statuses[""] stats.NumRequests++ stats.TotalLatency += time.Since(start) + svc.Unlock() }() if err := svc.ServiceConfig.Endpoint.Handler(svc, req); err != nil { + svc.Lock() + stats := svc.statuses[""] stats.NumErrors++ + svc.Unlock() req.Sub.mu.Lock() nc := req.Sub.conn req.Sub.mu.Unlock() @@ -310,6 +319,10 @@ func (svc *ServiceImpl) Version() string { } func (svc *ServiceImpl) Stats() ServiceStats { + svc.Lock() + defer func() { + svc.Unlock() + }() if svc.ServiceConfig.StatusHandler != nil { stats := svc.statuses[""] stats.Data = svc.ServiceConfig.StatusHandler(svc.Endpoint) From 43050b21e66c51d4049f84bae9ffdcb571bf25d0 Mon Sep 17 00:00:00 2001 From: Alberto Ricart Date: Thu, 20 Oct 2022 09:09:53 -0500 Subject: [PATCH 5/6] added subject field to ServiceInfo --- service.go | 27 ++++++++++++++++++--------- service_test.go | 13 ++++++++----- 2 files changed, 26 insertions(+), 14 deletions(-) diff --git a/service.go b/service.go index 5d4f77e91..863b9b759 100644 --- a/service.go +++ b/service.go @@ -61,13 +61,14 @@ type ServiceInfo struct { Id string `json:"id"` Description string `json:"description"` Version string `json:"version"` + Subject string `json:"subject"` } -func (s *ServiceInfo) Valid() error { +func (s *ServiceConfig) Valid() error { if s.Name == "" { return errors.New("name is required") } - return nil + return s.Endpoint.Valid() } type ServiceSchema struct { @@ -97,7 +98,10 @@ func (e *Endpoint) Valid() error { } type ServiceConfig struct { - ServiceInfo + Name string `json:"name"` + Id string `json:"id"` + Description string `json:"description"` + Version string `json:"version"` Schema ServiceSchema `json:"schema"` Endpoint Endpoint `json:"endpoint"` StatusHandler func(Endpoint) interface{} @@ -191,10 +195,7 @@ func (svc *ServiceImpl) _addInternalHandler(nc *Conn, verb ServiceVerb, kind str // NOTE we can do an OpenAPI version as well, but looking at it it was very involved. So I think keep simple version and // also have a version that talkes full blown OpenAPI spec and we can pull these things out. func (nc *Conn) AddService(config ServiceConfig) (Service, error) { - if err := config.ServiceInfo.Valid(); err != nil { - return nil, err - } - if err := config.Endpoint.Valid(); err != nil { + if err := config.Valid(); err != nil { return nil, err } @@ -222,8 +223,16 @@ func (nc *Conn) AddService(config ServiceConfig) (Service, error) { return nil, err } + info := &ServiceInfo{ + Name: config.Name, + Id: config.Id, + Description: config.Description, + Version: config.Version, + Subject: config.Endpoint.Subject, + } + infoHandler := func(m *Msg) { - response, _ := json.MarshalIndent(config.ServiceInfo, "", " ") + response, _ := json.MarshalIndent(info, "", " ") m.Respond(response) } @@ -257,7 +266,7 @@ func (nc *Conn) AddService(config ServiceConfig) (Service, error) { } } - svc.stats.ID = svc.ServiceInfo.Id + svc.stats.ID = svc.Id svc.stats.Started = time.Now() return svc, nil } diff --git a/service_test.go b/service_test.go index 3b6fba682..39ffe71d3 100644 --- a/service_test.go +++ b/service_test.go @@ -53,11 +53,9 @@ func TestServiceBasics(t *testing.T) { // Create 5 ServiceImpl responders. config := ServiceConfig{ - ServiceInfo: ServiceInfo{ - Name: "CoolAddService", - Version: "v0.1", - Description: "Add things together", - }, + Name: "CoolAddService", + Version: "v0.1", + Description: "Add things together", Endpoint: Endpoint{ Subject: "svc.add", Handler: doAdd, @@ -102,6 +100,11 @@ func TestServiceBasics(t *testing.T) { if err != nil { t.Fatalf("Expected a response, got %v", err) } + inf := ServiceInfo{} + json.Unmarshal(info.Data, &inf) + if inf.Subject != "svc.add" { + t.Fatalf("expected service subject to be srv.add: %s", inf.Subject) + } fmt.Printf("\ninfo response:\n%s\n\n", info.Data) // Get stats for all the nodes. Multiple responses. From a49531e926f8dfbb68735ebedd80d0302b5fc81f Mon Sep 17 00:00:00 2001 From: Piotr Piotrowski Date: Thu, 27 Oct 2022 12:51:05 +0200 Subject: [PATCH 6/6] Move services to separate package, clean up --- go.mod | 5 +- go.sum | 59 +----- service.go | 373 ------------------------------------ service_test.go | 145 -------------- services/errors.go | 25 +++ services/service.go | 404 +++++++++++++++++++++++++++++++++++++++ services/service_test.go | 260 +++++++++++++++++++++++++ 7 files changed, 691 insertions(+), 580 deletions(-) delete mode 100644 service.go delete mode 100644 service_test.go create mode 100644 services/errors.go create mode 100644 services/service.go create mode 100644 services/service_test.go diff --git a/go.mod b/go.mod index 9bc1fbebf..0f9e0b911 100644 --- a/go.mod +++ b/go.mod @@ -3,9 +3,6 @@ module github.com/nats-io/nats.go go 1.16 require ( - github.com/golang/protobuf v1.5.0 - github.com/nats-io/nats-server/v2 v2.9.3 // indirect github.com/nats-io/nkeys v0.3.0 github.com/nats-io/nuid v1.0.1 - google.golang.org/protobuf v1.28.1 -) +) \ No newline at end of file diff --git a/go.sum b/go.sum index 0a05b0b51..00fe58ccd 100644 --- a/go.sum +++ b/go.sum @@ -1,68 +1,11 @@ -github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8= -github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA= -github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs= -github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:WU3c8KckQ9AFe+yFwt9sWVRKCVIyN9cPHBJSNnbL67w= -github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0= -github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= -github.com/golang/protobuf v1.5.0 h1:LUVKkCeviFUMKqHa4tXIIij/lbhnMbP7Fn5wKdKkRh4= -github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= -github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= -github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= -github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= -github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= -github.com/klauspost/compress v1.15.11 h1:Lcadnb3RKGin4FYM/orgq0qde+nc15E5Cbqg4B9Sx9c= -github.com/klauspost/compress v1.15.11/go.mod h1:QPwzmACJjUTFsnSHH934V6woptycfrDDJnH7hvFVbGM= -github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= -github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= -github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= -github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA/g= -github.com/minio/highwayhash v1.0.2/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY= -github.com/nats-io/jwt/v2 v2.3.0 h1:z2mA1a7tIf5ShggOFlR1oBPgd6hGqcDYsISxZByUzdI= -github.com/nats-io/jwt/v2 v2.3.0/go.mod h1:0tqz9Hlu6bCBFLWAASKhE5vUA4c24L9KPUUgvwumE/k= -github.com/nats-io/nats-server/v2 v2.9.3 h1:HrfzA7G9LNetKkm1z+jU/e9kuAe+E6uaBuuq9EB5sQQ= -github.com/nats-io/nats-server/v2 v2.9.3/go.mod h1:4sq8wvrpbvSzL1n3ZfEYnH4qeUuIl5W990j3kw13rRk= -github.com/nats-io/nats.go v1.17.0/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w= github.com/nats-io/nkeys v0.3.0 h1:cgM5tL53EvYRU+2YLXIK0G2mJtK12Ft9oeooSZMA2G8= github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4= github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= -github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/prashantv/gostub v1.1.0/go.mod h1:A5zLQHz7ieHGG7is6LLXLz7I8+3LZzsrV0P1IAHhP5U= -github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= -github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -go.uber.org/automaxprocs v1.5.1/go.mod h1:BF4eumQw0P9GtnuxxovUd06vwm1o18oMzFtK66vU6XU= golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b h1:wSOdpTq0/eI46Ez/LkDwIsAKA71YP2SRKBODiRWM0as= golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4= -golang.org/x/crypto v0.0.0-20220926161630-eccd6366d1be h1:fmw3UbQh+nxngCAHrDCCztao/kbYFnWjoqop8dHx05A= -golang.org/x/crypto v0.0.0-20220926161630-eccd6366d1be/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= -golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= -golang.org/x/sys v0.0.0-20190130150945-aca44879d564/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20220928140112-f11e5e49a4ec h1:BkDtF2Ih9xZ7le9ndzTA7KJow28VbQW3odyk/8drmuI= -golang.org/x/sys v0.0.0-20220928140112-f11e5e49a4ec/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= -golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= -golang.org/x/time v0.0.0-20220922220347-f3bd1da661af h1:Yx9k8YCG3dvF87UAn2tu2HQLf2dt/eR1bXxpLMWeH+Y= -golang.org/x/time v0.0.0-20220922220347-f3bd1da661af/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= -golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= -golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= -google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= -google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= -google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM= -google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod h1:A+miEFZTKqfCUM6K7xSMQL9OKL/b6hQv+e19PK+JZNE= -google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo= -google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= -google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= -google.golang.org/protobuf v1.28.1 h1:d0NfwRgPtno5B1Wa6L2DAG+KivqkdutMf1UhdNx175w= -google.golang.org/protobuf v1.28.1/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= -gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= -gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= \ No newline at end of file diff --git a/service.go b/service.go deleted file mode 100644 index 863b9b759..000000000 --- a/service.go +++ /dev/null @@ -1,373 +0,0 @@ -// Copyright 2022 The NATS Authors -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package nats - -import ( - "encoding/json" - "errors" - "fmt" - "strings" - "sync" - "time" -) - -type Service interface { - ID() string - Name() string - Description() string - Version() string - Stats() ServiceStats - Reset() - Stop() -} - -// A request handler. -// TODO (could make error more and return more info to user automatically?) -type ServiceHandler func(svc Service, req *Msg) error - -// Clients can request as well. -type ServiceStats struct { - Name string `json:"name"` - ID string `json:"id"` - Version string `json:"version"` - Started time.Time `json:"started"` - Endpoints []Stats `json:"stats"` -} -type Stats struct { - Name string `json:"name"` - NumRequests int `json:"numRequests"` - NumErrors int `json:"numErrors"` - TotalLatency time.Duration `json:"totalLatency"` - Data interface{} `json:"data"` -} - -// We can fix this, as versions will be on separate subjects and use account mapping to roll requests to new versions etc. -const QG = "svc" - -// ServiceInfo is the basic information about a service type -type ServiceInfo struct { - Name string `json:"name"` - Id string `json:"id"` - Description string `json:"description"` - Version string `json:"version"` - Subject string `json:"subject"` -} - -func (s *ServiceConfig) Valid() error { - if s.Name == "" { - return errors.New("name is required") - } - return s.Endpoint.Valid() -} - -type ServiceSchema struct { - Request string `json:"request"` - Response string `json:"response"` -} - -type Endpoint struct { - Subject string `json:"subject"` - Handler ServiceHandler -} - -type InternalEndpoint struct { - Name string - Handler MsgHandler -} - -func (e *Endpoint) Valid() error { - s := strings.TrimSpace(e.Subject) - if len(s) == 0 { - return errors.New("subject is required") - } - if e.Handler == nil { - return errors.New("handler is required") - } - return nil -} - -type ServiceConfig struct { - Name string `json:"name"` - Id string `json:"id"` - Description string `json:"description"` - Version string `json:"version"` - Schema ServiceSchema `json:"schema"` - Endpoint Endpoint `json:"endpoint"` - StatusHandler func(Endpoint) interface{} -} - -// ServiceApiPrefix is the root of all control subjects -const ServiceApiPrefix = "$SRV" - -type ServiceVerb int64 - -const ( - SrvPing ServiceVerb = iota - SrvStatus - SrvInfo - SrvSchema -) - -func (s ServiceVerb) String() string { - switch s { - case SrvPing: - return "PING" - case SrvStatus: - return "STATUS" - case SrvInfo: - return "INFO" - case SrvSchema: - return "SCHEMA" - default: - return "" - } -} - -// ServiceImpl is the internal implementation of a Service -type ServiceImpl struct { - sync.Mutex - ServiceConfig - // subs - reqSub *Subscription - internal map[string]*Subscription - statuses map[string]*Stats - stats *ServiceStats -} - -// addInternalHandler generates control handlers for a specific verb -// each request generates 3 subscriptions, on for the general verb -// affecting all services written with the framework, one that handles -// all services of a particular kind, and finally a specific service. -func (svc *ServiceImpl) addInternalHandler(nc *Conn, verb ServiceVerb, handler MsgHandler) error { - name := fmt.Sprintf("%s-all", verb.String()) - if err := svc._addInternalHandler(nc, verb, "", "", name, handler); err != nil { - return err - } - name = fmt.Sprintf("%s-kind", verb.String()) - if err := svc._addInternalHandler(nc, verb, svc.Name(), "", name, handler); err != nil { - return err - } - return svc._addInternalHandler(nc, verb, svc.Name(), svc.ID(), verb.String(), handler) -} - -// _addInternalHandler registers a control subject handler -func (svc *ServiceImpl) _addInternalHandler(nc *Conn, verb ServiceVerb, kind string, id string, name string, handler MsgHandler) error { - subj, err := SvcControlSubject(verb, kind, id) - if err != nil { - svc.Stop() - return err - } - - svc.internal[name], err = nc.Subscribe(subj, func(msg *Msg) { - start := time.Now() - defer func() { - svc.Lock() - stats := svc.statuses[name] - stats.NumRequests++ - stats.TotalLatency += time.Since(start) - svc.Unlock() - }() - handler(msg) - }) - if err != nil { - svc.Stop() - return err - } - - svc.statuses[name] = &Stats{ - Name: name, - } - return nil -} - -// AddService adds a microservice. -// NOTE we can do an OpenAPI version as well, but looking at it it was very involved. So I think keep simple version and -// also have a version that talkes full blown OpenAPI spec and we can pull these things out. -func (nc *Conn) AddService(config ServiceConfig) (Service, error) { - if err := config.Valid(); err != nil { - return nil, err - } - - svc := &ServiceImpl{ServiceConfig: config} - svc.internal = make(map[string]*Subscription) - svc.statuses = make(map[string]*Stats) - svc.statuses[""] = &Stats{ - Name: config.Name, - } - - svc.stats = &ServiceStats{ - Name: config.Name, - ID: config.Id, - Version: config.Version, - Started: time.Now(), - } - - // Setup internal subscriptions. - var err error - - svc.reqSub, err = nc.QueueSubscribe(config.Endpoint.Subject, QG, func(m *Msg) { - svc.reqHandler(m) - }) - if err != nil { - return nil, err - } - - info := &ServiceInfo{ - Name: config.Name, - Id: config.Id, - Description: config.Description, - Version: config.Version, - Subject: config.Endpoint.Subject, - } - - infoHandler := func(m *Msg) { - response, _ := json.MarshalIndent(info, "", " ") - m.Respond(response) - } - - pingHandler := func(m *Msg) { - infoHandler(m) - } - - statusHandler := func(m *Msg) { - response, _ := json.MarshalIndent(svc.Stats(), "", " ") - m.Respond(response) - } - - schemaHandler := func(m *Msg) { - response, _ := json.MarshalIndent(svc.ServiceConfig.Schema, "", " ") - m.Respond(response) - } - - if err := svc.addInternalHandler(nc, SrvInfo, infoHandler); err != nil { - return nil, err - } - if err := svc.addInternalHandler(nc, SrvPing, pingHandler); err != nil { - return nil, err - } - if err := svc.addInternalHandler(nc, SrvStatus, statusHandler); err != nil { - return nil, err - } - - if svc.ServiceConfig.Schema.Request != "" || svc.ServiceConfig.Schema.Response != "" { - if err := svc.addInternalHandler(nc, SrvSchema, schemaHandler); err != nil { - return nil, err - } - } - - svc.stats.ID = svc.Id - svc.stats.Started = time.Now() - return svc, nil -} - -// reqHandler itself -func (svc *ServiceImpl) reqHandler(req *Msg) { - start := time.Now() - defer func() { - svc.Lock() - stats := svc.statuses[""] - stats.NumRequests++ - stats.TotalLatency += time.Since(start) - svc.Unlock() - }() - - if err := svc.ServiceConfig.Endpoint.Handler(svc, req); err != nil { - svc.Lock() - stats := svc.statuses[""] - stats.NumErrors++ - svc.Unlock() - req.Sub.mu.Lock() - nc := req.Sub.conn - req.Sub.mu.Unlock() - - hdr := []byte(fmt.Sprintf("NATS/1.0 500 %s\r\n\r\n", err.Error())) - nc.publish(req.Reply, _EMPTY_, hdr, nil) - } -} - -func (svc *ServiceImpl) Stop() { - if svc.reqSub != nil { - svc.reqSub.Drain() - svc.reqSub = nil - } - var keys []string - for key, sub := range svc.internal { - keys = append(keys, key) - sub.Drain() - } - for _, key := range keys { - delete(svc.internal, key) - } -} - -func (svc *ServiceImpl) ID() string { - return svc.ServiceConfig.Id -} - -func (svc *ServiceImpl) Name() string { - return svc.ServiceConfig.Name -} - -func (svc *ServiceImpl) Description() string { - return svc.ServiceConfig.Description -} - -func (svc *ServiceImpl) Version() string { - return svc.ServiceConfig.Version -} - -func (svc *ServiceImpl) Stats() ServiceStats { - svc.Lock() - defer func() { - svc.Unlock() - }() - if svc.ServiceConfig.StatusHandler != nil { - stats := svc.statuses[""] - stats.Data = svc.ServiceConfig.StatusHandler(svc.Endpoint) - } - idx := 0 - v := make([]Stats, len(svc.statuses)) - for _, se := range svc.statuses { - v[idx] = *se - idx++ - } - svc.stats.Endpoints = v - return *svc.stats -} - -func (svc *ServiceImpl) Reset() { - for _, se := range svc.statuses { - se.NumRequests = 0 - se.TotalLatency = 0 - se.NumErrors = 0 - se.Data = nil - } -} - -// SvcControlSubject returns monitoring subjects used by the ServiceImpl -func SvcControlSubject(verb ServiceVerb, kind, id string) (string, error) { - sverb := verb.String() - if sverb == "" { - return "", fmt.Errorf("unsupported ServiceImpl verb") - } - kind = strings.ToUpper(kind) - id = strings.ToUpper(id) - if kind == "" && id == "" { - return fmt.Sprintf("%s.%s", ServiceApiPrefix, sverb), nil - } - if id == "" { - return fmt.Sprintf("%s.%s.%s", ServiceApiPrefix, sverb, kind), nil - } - return fmt.Sprintf("%s.%s.%s.%s", ServiceApiPrefix, sverb, kind, id), nil -} diff --git a/service_test.go b/service_test.go deleted file mode 100644 index 39ffe71d3..000000000 --- a/service_test.go +++ /dev/null @@ -1,145 +0,0 @@ -// Copyright 2022 The NATS Authors -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package nats - -import ( - "encoding/json" - "fmt" - "math/rand" - "testing" - "time" -) - -//////////////////////////////////////////////////////////////////////////////// -// Package scoped specific tests here.. -//////////////////////////////////////////////////////////////////////////////// - -func TestServiceBasics(t *testing.T) { - s := RunServerOnPort(-1) - defer s.Shutdown() - - nc, err := Connect(s.ClientURL()) - if err != nil { - t.Fatalf("Expected to connect to server, got %v", err) - } - defer nc.Close() - - // Stub ServiceImpl. - doAdd := func(svc Service, req *Msg) error { - if rand.Intn(10) == 0 { - return fmt.Errorf("Unexpected Error!") - } - // Happy Path. - // Random delay between 5-10ms - time.Sleep(5*time.Millisecond + time.Duration(rand.Intn(5))*time.Millisecond) - req.Respond([]byte("42")) - return nil - } - - // Create 10 ServiceImpl responders. - - var svcs []Service - - // Create 5 ServiceImpl responders. - config := ServiceConfig{ - Name: "CoolAddService", - Version: "v0.1", - Description: "Add things together", - Endpoint: Endpoint{ - Subject: "svc.add", - Handler: doAdd, - }, - Schema: ServiceSchema{Request: "", Response: ""}, - } - - for i := 0; i < 5; i++ { - config.Id = fmt.Sprintf("%d", i) - svc, err := nc.AddService(config) - if err != nil { - t.Fatalf("Expected to create ServiceImpl, got %v", err) - } - defer svc.Stop() - svcs = append(svcs, svc) - } - - // Now send 50 requests. - for i := 0; i < 50; i++ { - _, err := nc.Request("svc.add", []byte(`{ "x": 22, "y": 11 }`), time.Second) - if err != nil { - t.Fatalf("Expected a response, got %v", err) - } - } - - for _, svc := range svcs { - if svc.Name() != "CoolAddService" { - t.Fatalf("Expected %q, got %q", "CoolAddService", svc.Name()) - } - if len(svc.Description()) == 0 || len(svc.Version()) == 0 { - t.Fatalf("Expected non emoty description and version") - } - } - - // Make sure we can request info, 1 response. - // This could be exported as well as main ServiceImpl. - subj, err := SvcControlSubject(SrvInfo, "CoolAddService", "") - if err != nil { - t.Fatalf("Failed to building info subject %v", err) - } - info, err := nc.Request(subj, nil, time.Second) - if err != nil { - t.Fatalf("Expected a response, got %v", err) - } - inf := ServiceInfo{} - json.Unmarshal(info.Data, &inf) - if inf.Subject != "svc.add" { - t.Fatalf("expected service subject to be srv.add: %s", inf.Subject) - } - fmt.Printf("\ninfo response:\n%s\n\n", info.Data) - - // Get stats for all the nodes. Multiple responses. - // could do STATZ too? - inbox := NewInbox() - sub, err := nc.SubscribeSync(inbox) - if err != nil { - t.Fatalf("subscribe failed: %s", err) - } - if err := nc.PublishRequest("svc.add.PING", inbox, nil); err != nil { - t.Fatalf("Unexpected error: %v", err) - } - - for { - resp, err := sub.NextMsg(250 * time.Millisecond) - if err != nil { - break - } - fmt.Printf("Received ping response: %s\n", resp.Data) - } - - subj, err = SvcControlSubject(SrvStatus, "CoolAddService", "") - if err != nil { - t.Fatalf("unexpected error from stats: %v", err) - } - - r, err := nc.Request(subj, nil, time.Second) - if err != nil { - t.Fatalf("request error: %v", err) - } - status := ServiceStats{} - if err := json.Unmarshal(r.Data, &status); err != nil { - t.Fatalf("unexpected error from stats: %v", err) - } - if len(status.Endpoints) != 10 { - t.Fatal("expected 10 endpoints") - } -} diff --git a/services/errors.go b/services/errors.go new file mode 100644 index 000000000..9ad1c5452 --- /dev/null +++ b/services/errors.go @@ -0,0 +1,25 @@ +// Copyright 2022 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package services + +import "fmt" + +type ServiceAPIError struct { + ErrorCode int + Description string +} + +func (e *ServiceAPIError) Error() string { + return fmt.Sprintf("%d %s", e.ErrorCode, e.Description) +} diff --git a/services/service.go b/services/service.go new file mode 100644 index 000000000..0e4b61508 --- /dev/null +++ b/services/service.go @@ -0,0 +1,404 @@ +// Copyright 2022 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package services + +import ( + "encoding/json" + "errors" + "fmt" + "strings" + "sync" + "time" + + "github.com/nats-io/nats.go" + "github.com/nats-io/nuid" +) + +// Notice: Experimental Preview +// +// This functionality is EXPERIMENTAL and may be changed in later releases. + +type ( + + // Service is an interface for sevice management. + // It exposes methods to stop/reset a service, as well as get information on a service. + Service interface { + ID() string + Name() string + Description() string + Version() string + Stats() ServiceStats + Reset() + Stop() + } + + // A request handler. + // TODO (could make error more and return more info to user automatically?) + ServiceHandler func(svc Service, req *nats.Msg) error + + // Clients can request as well. + ServiceStats struct { + Name string `json:"name"` + ID string `json:"id"` + Version string `json:"version"` + Started time.Time `json:"started"` + Endpoints []Stats `json:"stats"` + } + + Stats struct { + Name string `json:"name"` + NumRequests int `json:"num_requests"` + NumErrors int `json:"num_errors"` + TotalLatency time.Duration `json:"total_latency"` + AverageLatency time.Duration `json:"average_latency"` + Data interface{} `json:"data"` + } + + // ServiceInfo is the basic information about a service type + ServiceInfo struct { + Name string `json:"name"` + ID string `json:"id"` + Description string `json:"description"` + Version string `json:"version"` + Subject string `json:"subject"` + } + + ServiceSchema struct { + Request string `json:"request"` + Response string `json:"response"` + } + + Endpoint struct { + Subject string `json:"subject"` + Handler ServiceHandler + } + + InternalEndpoint struct { + Name string + Handler nats.MsgHandler + } + + ServiceVerb int64 + + ServiceConfig struct { + Name string `json:"name"` + Description string `json:"description"` + Version string `json:"version"` + Schema ServiceSchema `json:"schema"` + Endpoint Endpoint `json:"endpoint"` + StatusHandler func(Endpoint) interface{} + } + + // service is the internal implementation of a Service + service struct { + sync.Mutex + ServiceConfig + id string + // subs + reqSub *nats.Subscription + internal map[string]*nats.Subscription + statuses map[string]*Stats + stats *ServiceStats + conn *nats.Conn + } +) + +const ( + // We can fix this, as versions will be on separate subjects and use account mapping to roll requests to new versions etc. + QG = "svc" + + // ServiceApiPrefix is the root of all control subjects + ServiceApiPrefix = "$SRV" + + ServiceErrorHeader = "Nats-Service-Error" +) + +const ( + SrvPing ServiceVerb = iota + SrvStatus + SrvInfo + SrvSchema +) + +func (s *ServiceConfig) Valid() error { + if s.Name == "" { + return errors.New("name is required") + } + return s.Endpoint.Valid() +} + +func (e *Endpoint) Valid() error { + s := strings.TrimSpace(e.Subject) + if len(s) == 0 { + return errors.New("subject is required") + } + if e.Handler == nil { + return errors.New("handler is required") + } + return nil +} + +func (s ServiceVerb) String() string { + switch s { + case SrvPing: + return "PING" + case SrvStatus: + return "STATUS" + case SrvInfo: + return "INFO" + case SrvSchema: + return "SCHEMA" + default: + return "" + } +} + +// Add adds a microservice. +// NOTE we can do an OpenAPI version as well, but looking at it it was very involved. So I think keep simple version and +// also have a version that talkes full blown OpenAPI spec and we can pull these things out. +func Add(nc *nats.Conn, config ServiceConfig) (Service, error) { + if err := config.Valid(); err != nil { + return nil, err + } + + id := nuid.Next() + svc := &service{ + ServiceConfig: config, + conn: nc, + id: id, + } + svc.internal = make(map[string]*nats.Subscription) + svc.statuses = make(map[string]*Stats) + svc.statuses[""] = &Stats{ + Name: config.Name, + } + + svc.stats = &ServiceStats{ + Name: config.Name, + ID: id, + Version: config.Version, + Started: time.Now(), + } + + // Setup internal subscriptions. + var err error + + svc.reqSub, err = nc.QueueSubscribe(config.Endpoint.Subject, QG, func(m *nats.Msg) { + svc.reqHandler(m) + }) + if err != nil { + return nil, err + } + + info := &ServiceInfo{ + Name: config.Name, + ID: id, + Description: config.Description, + Version: config.Version, + Subject: config.Endpoint.Subject, + } + + infoHandler := func(m *nats.Msg) { + response, _ := json.MarshalIndent(info, "", " ") + m.Respond(response) + } + + pingHandler := func(m *nats.Msg) { + infoHandler(m) + } + + statusHandler := func(m *nats.Msg) { + response, _ := json.MarshalIndent(svc.Stats(), "", " ") + m.Respond(response) + } + + schemaHandler := func(m *nats.Msg) { + response, _ := json.MarshalIndent(svc.ServiceConfig.Schema, "", " ") + m.Respond(response) + } + + if err := svc.addInternalHandlerGroup(nc, SrvInfo, infoHandler); err != nil { + return nil, err + } + if err := svc.addInternalHandlerGroup(nc, SrvPing, pingHandler); err != nil { + return nil, err + } + if err := svc.addInternalHandlerGroup(nc, SrvStatus, statusHandler); err != nil { + return nil, err + } + + if svc.ServiceConfig.Schema.Request != "" || svc.ServiceConfig.Schema.Response != "" { + if err := svc.addInternalHandlerGroup(nc, SrvSchema, schemaHandler); err != nil { + return nil, err + } + } + + svc.stats.ID = id + svc.stats.Started = time.Now() + return svc, nil +} + +// addInternalHandlerGroup generates control handlers for a specific verb +// each request generates 3 subscriptions, one for the general verb +// affecting all services written with the framework, one that handles +// all services of a particular kind, and finally a specific service. +func (svc *service) addInternalHandlerGroup(nc *nats.Conn, verb ServiceVerb, handler nats.MsgHandler) error { + name := fmt.Sprintf("%s-all", verb.String()) + if err := svc.addInternalHandler(nc, verb, "", "", name, handler); err != nil { + return err + } + name = fmt.Sprintf("%s-kind", verb.String()) + if err := svc.addInternalHandler(nc, verb, svc.Name(), "", name, handler); err != nil { + return err + } + return svc.addInternalHandler(nc, verb, svc.Name(), svc.ID(), verb.String(), handler) +} + +// addInternalHandler registers a control subject handler +func (svc *service) addInternalHandler(nc *nats.Conn, verb ServiceVerb, kind, id, name string, handler nats.MsgHandler) error { + subj, err := SvcControlSubject(verb, kind, id) + if err != nil { + svc.Stop() + return err + } + + svc.internal[name], err = nc.Subscribe(subj, func(msg *nats.Msg) { + start := time.Now() + defer func() { + svc.Lock() + stats := svc.statuses[name] + stats.NumRequests++ + stats.TotalLatency += time.Since(start) + stats.AverageLatency = stats.TotalLatency / time.Duration(stats.NumRequests) + svc.Unlock() + }() + handler(msg) + }) + if err != nil { + svc.Stop() + return err + } + + svc.statuses[name] = &Stats{ + Name: name, + } + return nil +} + +// reqHandler itself +func (svc *service) reqHandler(req *nats.Msg) { + start := time.Now() + defer func() { + svc.Lock() + stats := svc.statuses[""] + stats.NumRequests++ + stats.TotalLatency += time.Since(start) + stats.AverageLatency = stats.TotalLatency / time.Duration(stats.NumRequests) + svc.Unlock() + }() + + if err := svc.ServiceConfig.Endpoint.Handler(svc, req); err != nil { + hdr := make(nats.Header) + apiErr := &ServiceAPIError{} + if ok := errors.As(err, &apiErr); !ok { + hdr[ServiceErrorHeader] = []string{fmt.Sprintf("%d %s", 500, err.Error())} + } else { + hdr[ServiceErrorHeader] = []string{apiErr.Error()} + } + svc.Lock() + stats := svc.statuses[""] + stats.NumErrors++ + svc.Unlock() + + svc.conn.PublishMsg(&nats.Msg{ + Subject: req.Reply, + Header: hdr, + }) + } +} + +func (svc *service) Stop() { + if svc.reqSub != nil { + svc.reqSub.Drain() + svc.reqSub = nil + } + var keys []string + for key, sub := range svc.internal { + keys = append(keys, key) + sub.Drain() + } + for _, key := range keys { + delete(svc.internal, key) + } +} + +func (svc *service) ID() string { + return svc.id +} + +func (svc *service) Name() string { + return svc.ServiceConfig.Name +} + +func (svc *service) Description() string { + return svc.ServiceConfig.Description +} + +func (svc *service) Version() string { + return svc.ServiceConfig.Version +} + +func (svc *service) Stats() ServiceStats { + svc.Lock() + defer func() { + svc.Unlock() + }() + if svc.ServiceConfig.StatusHandler != nil { + stats := svc.statuses[""] + stats.Data = svc.ServiceConfig.StatusHandler(svc.Endpoint) + } + idx := 0 + v := make([]Stats, len(svc.statuses)) + for _, se := range svc.statuses { + v[idx] = *se + idx++ + } + svc.stats.Endpoints = v + return *svc.stats +} + +func (svc *service) Reset() { + for _, se := range svc.statuses { + se.NumRequests = 0 + se.TotalLatency = 0 + se.NumErrors = 0 + se.Data = nil + } +} + +// SvcControlSubject returns monitoring subjects used by the ServiceImpl +func SvcControlSubject(verb ServiceVerb, kind, id string) (string, error) { + sverb := verb.String() + if sverb == "" { + return "", fmt.Errorf("unsupported service verb") + } + kind = strings.ToUpper(kind) + if kind == "" && id == "" { + return fmt.Sprintf("%s.%s", ServiceApiPrefix, sverb), nil + } + if id == "" { + return fmt.Sprintf("%s.%s.%s", ServiceApiPrefix, sverb, kind), nil + } + return fmt.Sprintf("%s.%s.%s.%s", ServiceApiPrefix, sverb, kind, id), nil +} diff --git a/services/service_test.go b/services/service_test.go new file mode 100644 index 000000000..532be8915 --- /dev/null +++ b/services/service_test.go @@ -0,0 +1,260 @@ +// Copyright 2022 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package services + +import ( + "encoding/json" + "fmt" + "math/rand" + "testing" + "time" + + "github.com/nats-io/nats-server/v2/server" + natsserver "github.com/nats-io/nats-server/v2/test" + "github.com/nats-io/nats.go" +) + +func TestServiceBasics(t *testing.T) { + s := RunServerOnPort(-1) + defer s.Shutdown() + + nc, err := nats.Connect(s.ClientURL()) + if err != nil { + t.Fatalf("Expected to connect to server, got %v", err) + } + defer nc.Close() + + // Stub service. + doAdd := func(svc Service, req *nats.Msg) error { + if rand.Intn(10) == 0 { + return fmt.Errorf("Unexpected Error!") + } + // Happy Path. + // Random delay between 5-10ms + time.Sleep(5*time.Millisecond + time.Duration(rand.Intn(5))*time.Millisecond) + if err := req.Respond([]byte("42")); err != nil { + return err + } + return nil + } + + var svcs []Service + + // Create 5 service responders. + config := ServiceConfig{ + Name: "CoolAddService", + Version: "v0.1", + Description: "Add things together", + Endpoint: Endpoint{ + Subject: "svc.add", + Handler: doAdd, + }, + Schema: ServiceSchema{Request: "", Response: ""}, + } + + for i := 0; i < 5; i++ { + svc, err := Add(nc, config) + if err != nil { + t.Fatalf("Expected to create Service, got %v", err) + } + defer svc.Stop() + svcs = append(svcs, svc) + } + + // Now send 50 requests. + for i := 0; i < 50; i++ { + _, err := nc.Request("svc.add", []byte(`{ "x": 22, "y": 11 }`), time.Second) + if err != nil { + t.Fatalf("Expected a response, got %v", err) + } + } + + for _, svc := range svcs { + if svc.Name() != "CoolAddService" { + t.Fatalf("Expected %q, got %q", "CoolAddService", svc.Name()) + } + if len(svc.Description()) == 0 || len(svc.Version()) == 0 { + t.Fatalf("Expected non empty description and version") + } + } + + // Make sure we can request info, 1 response. + // This could be exported as well as main ServiceImpl. + subj, err := SvcControlSubject(SrvInfo, "CoolAddService", "") + if err != nil { + t.Fatalf("Failed to building info subject %v", err) + } + info, err := nc.Request(subj, nil, time.Second) + if err != nil { + t.Fatalf("Expected a response, got %v", err) + } + var inf ServiceInfo + if err := json.Unmarshal(info.Data, &inf); err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if inf.Subject != "svc.add" { + t.Fatalf("expected service subject to be srv.add: %s", inf.Subject) + } + + // Ping all services. Multiple responses. + // could do STATZ too? + inbox := nats.NewInbox() + sub, err := nc.SubscribeSync(inbox) + if err != nil { + t.Fatalf("subscribe failed: %s", err) + } + pingSubject, err := SvcControlSubject(SrvPing, "CoolAddService", "") + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if err := nc.PublishRequest(pingSubject, inbox, nil); err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + var pingCount int + for { + _, err := sub.NextMsg(250 * time.Millisecond) + if err != nil { + break + } + pingCount++ + } + if pingCount != 5 { + t.Fatalf("Expected 5 ping responses, got: %d", pingCount) + } + + // Get stats from all services + statsInbox := nats.NewInbox() + sub, err = nc.SubscribeSync(statsInbox) + if err != nil { + t.Fatalf("subscribe failed: %s", err) + } + statsSubject, err := SvcControlSubject(SrvStatus, "CoolAddService", "") + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if err := nc.PublishRequest(statsSubject, statsInbox, nil); err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + stats := make([]ServiceStats, 0) + var requestsNum int + for { + resp, err := sub.NextMsg(250 * time.Millisecond) + if err != nil { + break + } + var srvStats ServiceStats + if err := json.Unmarshal(resp.Data, &srvStats); err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if len(srvStats.Endpoints) != 10 { + t.Fatalf("Expected 10 endpoints on a serivce, got: %d", len(srvStats.Endpoints)) + } + for _, e := range srvStats.Endpoints { + if e.Name == "CoolAddService" { + requestsNum += e.NumRequests + } + } + stats = append(stats, srvStats) + } + if len(stats) != 5 { + t.Fatalf("Expected stats for 5 services, got: %d", len(stats)) + } + + // Services should process 50 requests total + if requestsNum != 50 { + t.Fatalf("Expected a total fo 50 requests processed, got: %d", requestsNum) + } +} + +func TestServiceErrors(t *testing.T) { + tests := []struct { + name string + handlerResponse error + expectedStatus string + }{ + { + name: "generic error", + handlerResponse: fmt.Errorf("oops"), + expectedStatus: "500 oops", + }, + { + name: "api error", + handlerResponse: &ServiceAPIError{ErrorCode: 400, Description: "oops"}, + expectedStatus: "400 oops", + }, + { + name: "no error", + handlerResponse: nil, + expectedStatus: "", + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + s := RunServerOnPort(-1) + defer s.Shutdown() + + nc, err := nats.Connect(s.ClientURL()) + if err != nil { + t.Fatalf("Expected to connect to server, got %v", err) + } + defer nc.Close() + + // Stub service. + handler := func(svc Service, req *nats.Msg) error { + if test.handlerResponse == nil { + if err := req.Respond([]byte("ok")); err != nil { + return err + } + } + return test.handlerResponse + } + + svc, err := Add(nc, ServiceConfig{ + Name: "CoolService", + Description: "Erroring service", + Endpoint: Endpoint{ + Subject: "svc.fail", + Handler: handler, + }, + }) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + defer svc.Stop() + + resp, err := nc.Request("svc.fail", nil, 1*time.Second) + if err != nil { + t.Fatalf("request error") + } + + status := resp.Header.Get("Nats-Service-Error") + if status != test.expectedStatus { + t.Fatalf("Invalid response status; want: %q; got: %q", test.expectedStatus, status) + } + }) + } +} + +func RunServerOnPort(port int) *server.Server { + opts := natsserver.DefaultTestOptions + opts.Port = port + return RunServerWithOptions(&opts) +} + +func RunServerWithOptions(opts *server.Options) *server.Server { + return natsserver.RunServer(opts) +}