New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[EXPERIMENTAL] Service framework #1111
Merged
Merged
Changes from 1 commit
Commits
Show all changes
6 commits
Select commit
Hold shift + click to select a range
caedfdf
Services WIP
derekcollison b1007e7
[WIP] align services in go to javascript poc
aricart 680718f
lint
aricart bba377a
fixed data race
aricart 43050b2
added subject field to ServiceInfo
aricart a49531e
Move services to separate package, clean up
piotrpio File filter
Filter by extension
Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,182 @@ | ||
// Copyright 2022 The NATS Authors | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
package nats | ||
|
||
import ( | ||
"encoding/json" | ||
"fmt" | ||
"time" | ||
|
||
"github.com/nats-io/nuid" | ||
) | ||
|
||
type Service interface { | ||
ID() string | ||
Name() string | ||
Description() string | ||
Version() string | ||
|
||
// Stats | ||
Stats() Stats | ||
Reset() | ||
|
||
Close() | ||
} | ||
|
||
// A request handler. | ||
// TODO (could make error more and return more info to user automatically?) | ||
type RequestHandler func(svc Service, req *Msg) error | ||
|
||
// Clients can request as well. | ||
type Stats struct { | ||
ID string | ||
Started time.Time | ||
NumRequests int | ||
NumErrors int | ||
TotalLatency time.Duration | ||
} | ||
|
||
// We can fix this, as versions will be on separate subjects and use account mapping to roll requests to new versions etc. | ||
const QG = "svc" | ||
|
||
// When a request for info comes in we return json of this. | ||
type ServiceInfo struct { | ||
Name string | ||
Description string | ||
Version string | ||
} | ||
|
||
// Internal struct | ||
type service struct { | ||
id string | ||
name string | ||
description string | ||
version string | ||
stats Stats | ||
// subs | ||
reqSub *Subscription | ||
// info | ||
infoSub *Subscription | ||
// stats | ||
pingSub *Subscription | ||
|
||
cb RequestHandler | ||
} | ||
|
||
// Add a microservice. | ||
// NOTE we can do an OpenAPI version as well, but looking at it it was very involved. So I think keep simple version and | ||
// also have a version that talkes full blown OpenAPI spec and we can pull these things out. | ||
func (nc *Conn) AddService(name, description, version, subject, reqSchema, respSchema string, reqHandler RequestHandler) (Service, error) { | ||
|
||
// NOTE WIP | ||
svc := &service{id: nuid.Next(), name: name, description: description, version: version, cb: reqHandler} | ||
|
||
// Setup internal subscriptions. | ||
var err error | ||
|
||
svc.reqSub, err = nc.QueueSubscribe(subject, QG, func(m *Msg) { svc.reqHandler(m) }) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
// Construct info sub from main subject. | ||
info := fmt.Sprintf("%s.INFO", subject) | ||
svc.infoSub, err = nc.QueueSubscribe(info, QG, func(m *Msg) { | ||
si := &ServiceInfo{ | ||
Name: svc.name, | ||
Description: svc.description, | ||
Version: svc.version, | ||
} | ||
response, _ := json.MarshalIndent(si, "", " ") | ||
m.Respond(response) | ||
}) | ||
if err != nil { | ||
svc.Close() | ||
return nil, err | ||
} | ||
|
||
// Construct ping sub from main subject. | ||
// These will be responded by all handlers. | ||
ping := fmt.Sprintf("%s.PING", subject) | ||
svc.infoSub, err = nc.Subscribe(ping, func(m *Msg) { | ||
response, _ := json.MarshalIndent(svc.stats, "", " ") | ||
m.Respond(response) | ||
}) | ||
if err != nil { | ||
svc.Close() | ||
return nil, err | ||
} | ||
|
||
svc.stats.ID = svc.id | ||
svc.stats.Started = time.Now() | ||
return svc, nil | ||
} | ||
|
||
// reqHandler itself | ||
func (svc *service) reqHandler(req *Msg) { | ||
start := time.Now() | ||
defer func() { | ||
svc.stats.NumRequests++ | ||
svc.stats.TotalLatency += time.Since(start) | ||
}() | ||
|
||
if err := svc.cb(svc, req); err != nil { | ||
svc.stats.NumErrors++ | ||
req.Sub.mu.Lock() | ||
nc := req.Sub.conn | ||
req.Sub.mu.Unlock() | ||
|
||
hdr := []byte(fmt.Sprintf("NATS/1.0 500 %s\r\n\r\n", err.Error())) | ||
nc.publish(req.Reply, _EMPTY_, hdr, nil) | ||
} | ||
} | ||
|
||
func (svc *service) Close() { | ||
if svc.reqSub != nil { | ||
svc.reqSub.Drain() | ||
svc.reqSub = nil | ||
} | ||
if svc.infoSub != nil { | ||
svc.infoSub.Drain() | ||
svc.infoSub = nil | ||
} | ||
if svc.pingSub != nil { | ||
svc.pingSub.Drain() | ||
svc.pingSub = nil | ||
} | ||
} | ||
|
||
func (svc *service) ID() string { | ||
return svc.id | ||
} | ||
|
||
func (svc *service) Name() string { | ||
return svc.name | ||
} | ||
|
||
func (svc *service) Description() string { | ||
return svc.description | ||
} | ||
|
||
func (svc *service) Version() string { | ||
return svc.version | ||
} | ||
|
||
func (svc *service) Stats() Stats { | ||
return svc.stats | ||
} | ||
|
||
func (svc *service) Reset() { | ||
svc.stats = Stats{} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,114 @@ | ||
// Copyright 2022 The NATS Authors | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
package nats | ||
|
||
import ( | ||
"fmt" | ||
"math/rand" | ||
"testing" | ||
"time" | ||
) | ||
|
||
//////////////////////////////////////////////////////////////////////////////// | ||
// Package scoped specific tests here.. | ||
//////////////////////////////////////////////////////////////////////////////// | ||
|
||
func TestServiceBasics(t *testing.T) { | ||
s := RunServerOnPort(-1) | ||
defer s.Shutdown() | ||
|
||
nc, err := Connect(s.ClientURL()) | ||
if err != nil { | ||
t.Fatalf("Expected to connect to server, got %v", err) | ||
} | ||
defer nc.Close() | ||
|
||
// Stub service. | ||
doAdd := func(svc Service, req *Msg) error { | ||
if rand.Intn(10) == 0 { | ||
return fmt.Errorf("Unexpected Error!") | ||
} | ||
// Happy Path. | ||
// Random delay between 5-10ms | ||
time.Sleep(5*time.Millisecond + time.Duration(rand.Intn(5))*time.Millisecond) | ||
req.Respond([]byte("42")) | ||
return nil | ||
} | ||
|
||
// Create 10 service responders. | ||
|
||
var svcs []Service | ||
|
||
// Create 5 service responders. | ||
for i := 0; i < 5; i++ { | ||
svc, err := nc.AddService( | ||
"CoolAddService", | ||
"Add things together", | ||
"v0.1", | ||
"svc.add", | ||
_EMPTY_, // TBD - request schema | ||
_EMPTY_, // TBD - response schema | ||
doAdd, | ||
) | ||
if err != nil { | ||
t.Fatalf("Expected to create service, got %v", err) | ||
} | ||
defer svc.Close() | ||
svcs = append(svcs, svc) | ||
} | ||
|
||
// Now send 50 requests. | ||
for i := 0; i < 50; i++ { | ||
_, err := nc.Request("svc.add", []byte(`{ "x": 22, "y": 11 }`), time.Second) | ||
if err != nil { | ||
t.Fatalf("Expected a response, got %v", err) | ||
} | ||
} | ||
|
||
for _, svc := range svcs { | ||
if svc.Name() != "CoolAddService" { | ||
t.Fatalf("Expected %q, got %q", "CoolAddService", svc.Name()) | ||
} | ||
if len(svc.Description()) == 0 || len(svc.Version()) == 0 { | ||
t.Fatalf("Expected non emoty description and version") | ||
} | ||
} | ||
|
||
// Make sure we can request info, 1 response. | ||
// This could be exported as well as main service. | ||
info, err := nc.Request("svc.add.INFO", nil, time.Second) | ||
if err != nil { | ||
t.Fatalf("Expected a response, got %v", err) | ||
} | ||
fmt.Printf("\ninfo response:\n%s\n\n", info.Data) | ||
|
||
// Get stats for all the nodes. Multiple responses. | ||
// could do STATZ too? | ||
inbox := NewInbox() | ||
sub, err := nc.SubscribeSync(inbox) | ||
if err != nil { | ||
t.Fatalf("subscribe failed: %s", err) | ||
} | ||
if err := nc.PublishRequest("svc.add.PING", inbox, nil); err != nil { | ||
t.Fatalf("Unexpected error: %v", err) | ||
} | ||
|
||
for { | ||
resp, err := sub.NextMsg(250 * time.Millisecond) | ||
if err != nil { | ||
break | ||
} | ||
fmt.Printf("Received ping response: %s\n", resp.Data) | ||
} | ||
} |
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we have the subject you can send requests to here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it is part of the configuration for the service - part of the Endpoint struct - the reason for the indirection is that if when we support JetStream this will be a different kind of Endpoint.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But I think an info request from a NATS cli or random app should get the subject back from this call as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah! - that we can easily do that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed!