From 11293d3269ee1a97cbe2d60097264065388c76b8 Mon Sep 17 00:00:00 2001 From: Alberto Ricart Date: Thu, 27 Oct 2022 15:14:59 -0400 Subject: [PATCH] [EXPERIMENTAL] Service framework (#1111) Signed-off-by: Derek Collison Co-authored-by: Derek Collison Co-authored-by: Alberto Ricart Collison Co-authored-by: Piotr Piotrowski --- go.mod | 2 +- go.sum | 2 +- services/errors.go | 25 +++ services/service.go | 404 +++++++++++++++++++++++++++++++++++++++ services/service_test.go | 260 +++++++++++++++++++++++++ 5 files changed, 691 insertions(+), 2 deletions(-) create mode 100644 services/errors.go create mode 100644 services/service.go create mode 100644 services/service_test.go diff --git a/go.mod b/go.mod index 63faedf43..0f9e0b911 100644 --- a/go.mod +++ b/go.mod @@ -5,4 +5,4 @@ go 1.16 require ( github.com/nats-io/nkeys v0.3.0 github.com/nats-io/nuid v1.0.1 -) +) \ No newline at end of file diff --git a/go.sum b/go.sum index 2138ffb56..00fe58ccd 100644 --- a/go.sum +++ b/go.sum @@ -8,4 +8,4 @@ golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= -golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= \ No newline at end of file diff --git a/services/errors.go b/services/errors.go new file mode 100644 index 000000000..9ad1c5452 --- /dev/null +++ b/services/errors.go @@ -0,0 +1,25 @@ +// Copyright 2022 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package services + +import "fmt" + +type ServiceAPIError struct { + ErrorCode int + Description string +} + +func (e *ServiceAPIError) Error() string { + return fmt.Sprintf("%d %s", e.ErrorCode, e.Description) +} diff --git a/services/service.go b/services/service.go new file mode 100644 index 000000000..0e4b61508 --- /dev/null +++ b/services/service.go @@ -0,0 +1,404 @@ +// Copyright 2022 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package services + +import ( + "encoding/json" + "errors" + "fmt" + "strings" + "sync" + "time" + + "github.com/nats-io/nats.go" + "github.com/nats-io/nuid" +) + +// Notice: Experimental Preview +// +// This functionality is EXPERIMENTAL and may be changed in later releases. + +type ( + + // Service is an interface for sevice management. + // It exposes methods to stop/reset a service, as well as get information on a service. + Service interface { + ID() string + Name() string + Description() string + Version() string + Stats() ServiceStats + Reset() + Stop() + } + + // A request handler. + // TODO (could make error more and return more info to user automatically?) + ServiceHandler func(svc Service, req *nats.Msg) error + + // Clients can request as well. + ServiceStats struct { + Name string `json:"name"` + ID string `json:"id"` + Version string `json:"version"` + Started time.Time `json:"started"` + Endpoints []Stats `json:"stats"` + } + + Stats struct { + Name string `json:"name"` + NumRequests int `json:"num_requests"` + NumErrors int `json:"num_errors"` + TotalLatency time.Duration `json:"total_latency"` + AverageLatency time.Duration `json:"average_latency"` + Data interface{} `json:"data"` + } + + // ServiceInfo is the basic information about a service type + ServiceInfo struct { + Name string `json:"name"` + ID string `json:"id"` + Description string `json:"description"` + Version string `json:"version"` + Subject string `json:"subject"` + } + + ServiceSchema struct { + Request string `json:"request"` + Response string `json:"response"` + } + + Endpoint struct { + Subject string `json:"subject"` + Handler ServiceHandler + } + + InternalEndpoint struct { + Name string + Handler nats.MsgHandler + } + + ServiceVerb int64 + + ServiceConfig struct { + Name string `json:"name"` + Description string `json:"description"` + Version string `json:"version"` + Schema ServiceSchema `json:"schema"` + Endpoint Endpoint `json:"endpoint"` + StatusHandler func(Endpoint) interface{} + } + + // service is the internal implementation of a Service + service struct { + sync.Mutex + ServiceConfig + id string + // subs + reqSub *nats.Subscription + internal map[string]*nats.Subscription + statuses map[string]*Stats + stats *ServiceStats + conn *nats.Conn + } +) + +const ( + // We can fix this, as versions will be on separate subjects and use account mapping to roll requests to new versions etc. + QG = "svc" + + // ServiceApiPrefix is the root of all control subjects + ServiceApiPrefix = "$SRV" + + ServiceErrorHeader = "Nats-Service-Error" +) + +const ( + SrvPing ServiceVerb = iota + SrvStatus + SrvInfo + SrvSchema +) + +func (s *ServiceConfig) Valid() error { + if s.Name == "" { + return errors.New("name is required") + } + return s.Endpoint.Valid() +} + +func (e *Endpoint) Valid() error { + s := strings.TrimSpace(e.Subject) + if len(s) == 0 { + return errors.New("subject is required") + } + if e.Handler == nil { + return errors.New("handler is required") + } + return nil +} + +func (s ServiceVerb) String() string { + switch s { + case SrvPing: + return "PING" + case SrvStatus: + return "STATUS" + case SrvInfo: + return "INFO" + case SrvSchema: + return "SCHEMA" + default: + return "" + } +} + +// Add adds a microservice. +// NOTE we can do an OpenAPI version as well, but looking at it it was very involved. So I think keep simple version and +// also have a version that talkes full blown OpenAPI spec and we can pull these things out. +func Add(nc *nats.Conn, config ServiceConfig) (Service, error) { + if err := config.Valid(); err != nil { + return nil, err + } + + id := nuid.Next() + svc := &service{ + ServiceConfig: config, + conn: nc, + id: id, + } + svc.internal = make(map[string]*nats.Subscription) + svc.statuses = make(map[string]*Stats) + svc.statuses[""] = &Stats{ + Name: config.Name, + } + + svc.stats = &ServiceStats{ + Name: config.Name, + ID: id, + Version: config.Version, + Started: time.Now(), + } + + // Setup internal subscriptions. + var err error + + svc.reqSub, err = nc.QueueSubscribe(config.Endpoint.Subject, QG, func(m *nats.Msg) { + svc.reqHandler(m) + }) + if err != nil { + return nil, err + } + + info := &ServiceInfo{ + Name: config.Name, + ID: id, + Description: config.Description, + Version: config.Version, + Subject: config.Endpoint.Subject, + } + + infoHandler := func(m *nats.Msg) { + response, _ := json.MarshalIndent(info, "", " ") + m.Respond(response) + } + + pingHandler := func(m *nats.Msg) { + infoHandler(m) + } + + statusHandler := func(m *nats.Msg) { + response, _ := json.MarshalIndent(svc.Stats(), "", " ") + m.Respond(response) + } + + schemaHandler := func(m *nats.Msg) { + response, _ := json.MarshalIndent(svc.ServiceConfig.Schema, "", " ") + m.Respond(response) + } + + if err := svc.addInternalHandlerGroup(nc, SrvInfo, infoHandler); err != nil { + return nil, err + } + if err := svc.addInternalHandlerGroup(nc, SrvPing, pingHandler); err != nil { + return nil, err + } + if err := svc.addInternalHandlerGroup(nc, SrvStatus, statusHandler); err != nil { + return nil, err + } + + if svc.ServiceConfig.Schema.Request != "" || svc.ServiceConfig.Schema.Response != "" { + if err := svc.addInternalHandlerGroup(nc, SrvSchema, schemaHandler); err != nil { + return nil, err + } + } + + svc.stats.ID = id + svc.stats.Started = time.Now() + return svc, nil +} + +// addInternalHandlerGroup generates control handlers for a specific verb +// each request generates 3 subscriptions, one for the general verb +// affecting all services written with the framework, one that handles +// all services of a particular kind, and finally a specific service. +func (svc *service) addInternalHandlerGroup(nc *nats.Conn, verb ServiceVerb, handler nats.MsgHandler) error { + name := fmt.Sprintf("%s-all", verb.String()) + if err := svc.addInternalHandler(nc, verb, "", "", name, handler); err != nil { + return err + } + name = fmt.Sprintf("%s-kind", verb.String()) + if err := svc.addInternalHandler(nc, verb, svc.Name(), "", name, handler); err != nil { + return err + } + return svc.addInternalHandler(nc, verb, svc.Name(), svc.ID(), verb.String(), handler) +} + +// addInternalHandler registers a control subject handler +func (svc *service) addInternalHandler(nc *nats.Conn, verb ServiceVerb, kind, id, name string, handler nats.MsgHandler) error { + subj, err := SvcControlSubject(verb, kind, id) + if err != nil { + svc.Stop() + return err + } + + svc.internal[name], err = nc.Subscribe(subj, func(msg *nats.Msg) { + start := time.Now() + defer func() { + svc.Lock() + stats := svc.statuses[name] + stats.NumRequests++ + stats.TotalLatency += time.Since(start) + stats.AverageLatency = stats.TotalLatency / time.Duration(stats.NumRequests) + svc.Unlock() + }() + handler(msg) + }) + if err != nil { + svc.Stop() + return err + } + + svc.statuses[name] = &Stats{ + Name: name, + } + return nil +} + +// reqHandler itself +func (svc *service) reqHandler(req *nats.Msg) { + start := time.Now() + defer func() { + svc.Lock() + stats := svc.statuses[""] + stats.NumRequests++ + stats.TotalLatency += time.Since(start) + stats.AverageLatency = stats.TotalLatency / time.Duration(stats.NumRequests) + svc.Unlock() + }() + + if err := svc.ServiceConfig.Endpoint.Handler(svc, req); err != nil { + hdr := make(nats.Header) + apiErr := &ServiceAPIError{} + if ok := errors.As(err, &apiErr); !ok { + hdr[ServiceErrorHeader] = []string{fmt.Sprintf("%d %s", 500, err.Error())} + } else { + hdr[ServiceErrorHeader] = []string{apiErr.Error()} + } + svc.Lock() + stats := svc.statuses[""] + stats.NumErrors++ + svc.Unlock() + + svc.conn.PublishMsg(&nats.Msg{ + Subject: req.Reply, + Header: hdr, + }) + } +} + +func (svc *service) Stop() { + if svc.reqSub != nil { + svc.reqSub.Drain() + svc.reqSub = nil + } + var keys []string + for key, sub := range svc.internal { + keys = append(keys, key) + sub.Drain() + } + for _, key := range keys { + delete(svc.internal, key) + } +} + +func (svc *service) ID() string { + return svc.id +} + +func (svc *service) Name() string { + return svc.ServiceConfig.Name +} + +func (svc *service) Description() string { + return svc.ServiceConfig.Description +} + +func (svc *service) Version() string { + return svc.ServiceConfig.Version +} + +func (svc *service) Stats() ServiceStats { + svc.Lock() + defer func() { + svc.Unlock() + }() + if svc.ServiceConfig.StatusHandler != nil { + stats := svc.statuses[""] + stats.Data = svc.ServiceConfig.StatusHandler(svc.Endpoint) + } + idx := 0 + v := make([]Stats, len(svc.statuses)) + for _, se := range svc.statuses { + v[idx] = *se + idx++ + } + svc.stats.Endpoints = v + return *svc.stats +} + +func (svc *service) Reset() { + for _, se := range svc.statuses { + se.NumRequests = 0 + se.TotalLatency = 0 + se.NumErrors = 0 + se.Data = nil + } +} + +// SvcControlSubject returns monitoring subjects used by the ServiceImpl +func SvcControlSubject(verb ServiceVerb, kind, id string) (string, error) { + sverb := verb.String() + if sverb == "" { + return "", fmt.Errorf("unsupported service verb") + } + kind = strings.ToUpper(kind) + if kind == "" && id == "" { + return fmt.Sprintf("%s.%s", ServiceApiPrefix, sverb), nil + } + if id == "" { + return fmt.Sprintf("%s.%s.%s", ServiceApiPrefix, sverb, kind), nil + } + return fmt.Sprintf("%s.%s.%s.%s", ServiceApiPrefix, sverb, kind, id), nil +} diff --git a/services/service_test.go b/services/service_test.go new file mode 100644 index 000000000..532be8915 --- /dev/null +++ b/services/service_test.go @@ -0,0 +1,260 @@ +// Copyright 2022 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package services + +import ( + "encoding/json" + "fmt" + "math/rand" + "testing" + "time" + + "github.com/nats-io/nats-server/v2/server" + natsserver "github.com/nats-io/nats-server/v2/test" + "github.com/nats-io/nats.go" +) + +func TestServiceBasics(t *testing.T) { + s := RunServerOnPort(-1) + defer s.Shutdown() + + nc, err := nats.Connect(s.ClientURL()) + if err != nil { + t.Fatalf("Expected to connect to server, got %v", err) + } + defer nc.Close() + + // Stub service. + doAdd := func(svc Service, req *nats.Msg) error { + if rand.Intn(10) == 0 { + return fmt.Errorf("Unexpected Error!") + } + // Happy Path. + // Random delay between 5-10ms + time.Sleep(5*time.Millisecond + time.Duration(rand.Intn(5))*time.Millisecond) + if err := req.Respond([]byte("42")); err != nil { + return err + } + return nil + } + + var svcs []Service + + // Create 5 service responders. + config := ServiceConfig{ + Name: "CoolAddService", + Version: "v0.1", + Description: "Add things together", + Endpoint: Endpoint{ + Subject: "svc.add", + Handler: doAdd, + }, + Schema: ServiceSchema{Request: "", Response: ""}, + } + + for i := 0; i < 5; i++ { + svc, err := Add(nc, config) + if err != nil { + t.Fatalf("Expected to create Service, got %v", err) + } + defer svc.Stop() + svcs = append(svcs, svc) + } + + // Now send 50 requests. + for i := 0; i < 50; i++ { + _, err := nc.Request("svc.add", []byte(`{ "x": 22, "y": 11 }`), time.Second) + if err != nil { + t.Fatalf("Expected a response, got %v", err) + } + } + + for _, svc := range svcs { + if svc.Name() != "CoolAddService" { + t.Fatalf("Expected %q, got %q", "CoolAddService", svc.Name()) + } + if len(svc.Description()) == 0 || len(svc.Version()) == 0 { + t.Fatalf("Expected non empty description and version") + } + } + + // Make sure we can request info, 1 response. + // This could be exported as well as main ServiceImpl. + subj, err := SvcControlSubject(SrvInfo, "CoolAddService", "") + if err != nil { + t.Fatalf("Failed to building info subject %v", err) + } + info, err := nc.Request(subj, nil, time.Second) + if err != nil { + t.Fatalf("Expected a response, got %v", err) + } + var inf ServiceInfo + if err := json.Unmarshal(info.Data, &inf); err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if inf.Subject != "svc.add" { + t.Fatalf("expected service subject to be srv.add: %s", inf.Subject) + } + + // Ping all services. Multiple responses. + // could do STATZ too? + inbox := nats.NewInbox() + sub, err := nc.SubscribeSync(inbox) + if err != nil { + t.Fatalf("subscribe failed: %s", err) + } + pingSubject, err := SvcControlSubject(SrvPing, "CoolAddService", "") + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if err := nc.PublishRequest(pingSubject, inbox, nil); err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + var pingCount int + for { + _, err := sub.NextMsg(250 * time.Millisecond) + if err != nil { + break + } + pingCount++ + } + if pingCount != 5 { + t.Fatalf("Expected 5 ping responses, got: %d", pingCount) + } + + // Get stats from all services + statsInbox := nats.NewInbox() + sub, err = nc.SubscribeSync(statsInbox) + if err != nil { + t.Fatalf("subscribe failed: %s", err) + } + statsSubject, err := SvcControlSubject(SrvStatus, "CoolAddService", "") + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if err := nc.PublishRequest(statsSubject, statsInbox, nil); err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + stats := make([]ServiceStats, 0) + var requestsNum int + for { + resp, err := sub.NextMsg(250 * time.Millisecond) + if err != nil { + break + } + var srvStats ServiceStats + if err := json.Unmarshal(resp.Data, &srvStats); err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if len(srvStats.Endpoints) != 10 { + t.Fatalf("Expected 10 endpoints on a serivce, got: %d", len(srvStats.Endpoints)) + } + for _, e := range srvStats.Endpoints { + if e.Name == "CoolAddService" { + requestsNum += e.NumRequests + } + } + stats = append(stats, srvStats) + } + if len(stats) != 5 { + t.Fatalf("Expected stats for 5 services, got: %d", len(stats)) + } + + // Services should process 50 requests total + if requestsNum != 50 { + t.Fatalf("Expected a total fo 50 requests processed, got: %d", requestsNum) + } +} + +func TestServiceErrors(t *testing.T) { + tests := []struct { + name string + handlerResponse error + expectedStatus string + }{ + { + name: "generic error", + handlerResponse: fmt.Errorf("oops"), + expectedStatus: "500 oops", + }, + { + name: "api error", + handlerResponse: &ServiceAPIError{ErrorCode: 400, Description: "oops"}, + expectedStatus: "400 oops", + }, + { + name: "no error", + handlerResponse: nil, + expectedStatus: "", + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + s := RunServerOnPort(-1) + defer s.Shutdown() + + nc, err := nats.Connect(s.ClientURL()) + if err != nil { + t.Fatalf("Expected to connect to server, got %v", err) + } + defer nc.Close() + + // Stub service. + handler := func(svc Service, req *nats.Msg) error { + if test.handlerResponse == nil { + if err := req.Respond([]byte("ok")); err != nil { + return err + } + } + return test.handlerResponse + } + + svc, err := Add(nc, ServiceConfig{ + Name: "CoolService", + Description: "Erroring service", + Endpoint: Endpoint{ + Subject: "svc.fail", + Handler: handler, + }, + }) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + defer svc.Stop() + + resp, err := nc.Request("svc.fail", nil, 1*time.Second) + if err != nil { + t.Fatalf("request error") + } + + status := resp.Header.Get("Nats-Service-Error") + if status != test.expectedStatus { + t.Fatalf("Invalid response status; want: %q; got: %q", test.expectedStatus, status) + } + }) + } +} + +func RunServerOnPort(port int) *server.Server { + opts := natsserver.DefaultTestOptions + opts.Port = port + return RunServerWithOptions(&opts) +} + +func RunServerWithOptions(opts *server.Options) *server.Server { + return natsserver.RunServer(opts) +}