From 632b0f86c3e475e5c94190b642cba6210c3ad889 Mon Sep 17 00:00:00 2001 From: Colin Sullivan Date: Wed, 6 Apr 2022 20:01:40 -0600 Subject: [PATCH 1/2] add a standard subscription [ci-skip] Signed-off-by: Colin Sullivan --- examples/nats-echo/main.go | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/examples/nats-echo/main.go b/examples/nats-echo/main.go index 4ad228691..2a4d4056e 100644 --- a/examples/nats-echo/main.go +++ b/examples/nats-echo/main.go @@ -44,7 +44,7 @@ func showUsageAndExit(exitcode int) { } func printMsg(m *nats.Msg, i int) { - log.Printf("[#%d] Echoing to [%s]: %q", i, m.Reply, m.Data) + log.Printf("[#%d] Echoing from [%s] to [%s]: %q", i, m.Subject, m.Reply, m.Data) } func main() { @@ -103,7 +103,7 @@ func main() { subj, i := args[0], 0 - nc.QueueSubscribe(subj, "echo", func(msg *nats.Msg) { + handleMsg := func(msg *nats.Msg) { i++ if msg.Reply != "" { printMsg(msg, i) @@ -115,6 +115,14 @@ func main() { nc.Publish(msg.Reply, msg.Data) } } + } + + nc.QueueSubscribe(subj, "echo", func(msg *nats.Msg) { + handleMsg(msg) + }) + allSubj := subj + ".all" + nc.Subscribe(allSubj, func(msg *nats.Msg) { + handleMsg(msg) }) nc.Flush() @@ -123,8 +131,9 @@ func main() { } log.Printf("Echo Service listening on [%s]\n", subj) + log.Printf("Echo Service (All) listening on [%s]\n", allSubj) - // Now handle signal to terminate so we cam drain on exit. + // Now handle signal to terminate so we can drain on exit. c := make(chan os.Signal, 1) signal.Notify(c, syscall.SIGINT) From f6e4f8dba5d98c2441ddffebbf4b67dc8ee39b8e Mon Sep 17 00:00:00 2001 From: Colin Sullivan Date: Thu, 7 Apr 2022 13:32:02 -0600 Subject: [PATCH 2/2] add geo URL and status JSON Signed-off-by: Colin Sullivan --- examples/nats-echo/main.go | 59 +++++++++++++++++++++++++------------- 1 file changed, 39 insertions(+), 20 deletions(-) diff --git a/examples/nats-echo/main.go b/examples/nats-echo/main.go index 2a4d4056e..b78ea91a1 100644 --- a/examples/nats-echo/main.go +++ b/examples/nats-echo/main.go @@ -47,14 +47,24 @@ func printMsg(m *nats.Msg, i int) { log.Printf("[#%d] Echoing from [%s] to [%s]: %q", i, m.Subject, m.Reply, m.Data) } +func printStatusMsg(m *nats.Msg, i int) { + log.Printf("[#%d] Sending status from [%s] to [%s]: %q", i, m.Subject, m.Reply, m.Data) +} + +type serviceStatus struct { + Id string `json:"id"` + Geo string `json:"geo"` +} + func main() { var urls = flag.String("s", nats.DefaultURL, "The nats server URLs (separated by comma)") var userCreds = flag.String("creds", "", "User Credentials File") var nkeyFile = flag.String("nkey", "", "NKey Seed File") + var serviceId = flag.String("id", "NATS Echo Service", "Identifier for this service") var showTime = flag.Bool("t", false, "Display timestamps") var showHelp = flag.Bool("h", false, "Show help message") var geoloc = flag.Bool("geo", false, "Display geo location of echo service") - var geo string + var geo string = "unknown" log.SetFlags(0) flag.Usage = usage @@ -74,7 +84,7 @@ func main() { geo = lookupGeo() } // Connect Options. - opts := []nats.Option{nats.Name("NATS Echo Service")} + opts := []nats.Option{nats.Name(*serviceId)} opts = setupConnOptions(opts) if *userCreds != "" && *nkeyFile != "" { @@ -101,28 +111,30 @@ func main() { log.Fatal(err) } - subj, i := args[0], 0 + subj, iEcho, iStatus := args[0], 0, 0 + statusSubj := subj + ".status" - handleMsg := func(msg *nats.Msg) { - i++ + nc.QueueSubscribe(subj, "echo", func(msg *nats.Msg) { + iEcho++ + printMsg(msg, iEcho) if msg.Reply != "" { - printMsg(msg, i) // Just echo back what they sent us. - if geo != "" { - m := fmt.Sprintf("[%s]: %q", geo, msg.Data) - nc.Publish(msg.Reply, []byte(m)) + var payload []byte + if geo != "unknown" { + payload = []byte(fmt.Sprintf("[%s]: %q", geo, msg.Data)) } else { - nc.Publish(msg.Reply, msg.Data) + payload = msg.Data } + nc.Publish(msg.Reply, payload) } - } - - nc.QueueSubscribe(subj, "echo", func(msg *nats.Msg) { - handleMsg(msg) }) - allSubj := subj + ".all" - nc.Subscribe(allSubj, func(msg *nats.Msg) { - handleMsg(msg) + nc.Subscribe(statusSubj, func(msg *nats.Msg) { + iStatus++ + printStatusMsg(msg, iStatus) + if msg.Reply != "" { + payload, _ := json.Marshal(&serviceStatus{Id: *serviceId, Geo: geo}) + nc.Publish(msg.Reply, payload) + } }) nc.Flush() @@ -130,8 +142,9 @@ func main() { log.Fatal(err) } + log.Printf("Echo Service ID: [%s]", *serviceId) log.Printf("Echo Service listening on [%s]\n", subj) - log.Printf("Echo Service (All) listening on [%s]\n", allSubj) + log.Printf("Echo Service (Status) listening on [%s]\n", statusSubj) // Now handle signal to terminate so we can drain on exit. c := make(chan os.Signal, 1) @@ -185,9 +198,15 @@ type geo struct { // lookup our current region and country.. func lookupGeo() string { c := &http.Client{Timeout: 2 * time.Second} - resp, err := c.Get("https://ipapi.co/json") + + url := os.Getenv("ECHO_SVC_GEO_URL") + if len(url) == 0 { + url = "https://ipapi.co/json" + } + + resp, err := c.Get(url) if err != nil || resp == nil { - log.Fatalf("Could not retrive geo location data: %v", err) + log.Fatalf("Could not retrieve geo location data: %v", err) } defer resp.Body.Close() body, _ := ioutil.ReadAll(resp.Body)