Skip to content

Commit

Permalink
Register dispatcher in the default dig graph (#304)
Browse files Browse the repository at this point in the history
Right now people are confused about rpc.Dispatcher(),
which returns you a dispatcher only if service is started 
and testing handlers becomes much harder. 

Proposed solution is to inject it in the default dig graph.
  • Loading branch information
Alex committed Feb 22, 2017
1 parent 96faff9 commit b897b67
Show file tree
Hide file tree
Showing 4 changed files with 82 additions and 25 deletions.
3 changes: 2 additions & 1 deletion modules/rpc/thrift.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ func newYARPCThriftModule(
reg := func(mod *YARPCModule) {
_setupMu.Lock()
defer _setupMu.Unlock()
Dispatcher().Register(registrants)

mod.controller.dispatcher.Register(registrants)
}

return newYARPCModule(mi, reg, options...)
Expand Down
10 changes: 9 additions & 1 deletion modules/rpc/thrift_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,13 @@ import (
"testing"

"go.uber.org/fx/config"
"go.uber.org/fx/dig"
"go.uber.org/fx/modules"
"go.uber.org/fx/service"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/yarpc"
"go.uber.org/yarpc/api/transport"
"go.uber.org/yarpc/encoding/thrift"
)
Expand Down Expand Up @@ -74,6 +76,11 @@ modules:

testInitRunModule(t, goofy[0], mci)
testInitRunModule(t, gopher[0], mci)

// Dispatcher must be resolved in the default graph
var dispatcher *yarpc.Dispatcher
assert.NoError(t, dig.Resolve(&dispatcher))
assert.Equal(t, 2, len(dispatcher.Inbounds()))
}

func TestThriftModule_BadOptions(t *testing.T) {
Expand Down Expand Up @@ -105,7 +112,8 @@ func testInitRunModule(t *testing.T, mod service.Module, mci service.ModuleCreat

func mch() service.ModuleCreateInfo {
return service.ModuleCreateInfo{
Host: service.NopHost(),
Host: service.NopHost(),
Items: make(map[string]interface{}),
}
}

Expand Down
78 changes: 55 additions & 23 deletions modules/rpc/yarpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"context"
"errors"
"fmt"
"strconv"
"sync"

"go.uber.org/fx/modules"
Expand All @@ -32,6 +33,7 @@ import (
"go.uber.org/fx/ulog"

errs "github.com/pkg/errors"
"go.uber.org/fx/dig"
"go.uber.org/yarpc"
"go.uber.org/yarpc/api/middleware"
"go.uber.org/yarpc/api/transport"
Expand All @@ -42,13 +44,17 @@ import (
// YARPCModule is an implementation of a core RPC module using YARPC.
// All the YARPC modules share the same dispatcher and middleware.
// Dispatcher will start when any created module calls Start().
// The YARPC team advised dispatcher to be a 'singleton' to control
// the lifecycle of all of the in/out bound traffic, so we will
// register it in a dig.Graph provided with options/default graph.
type YARPCModule struct {
modules.ModuleBase
register registerServiceFunc
config yarpcConfig
log ulog.Log
stateMu sync.RWMutex
isRunning bool
register registerServiceFunc
config yarpcConfig
log ulog.Log
stateMu sync.RWMutex
isRunning bool
controller *dispatcherController
}

var (
Expand All @@ -61,12 +67,6 @@ var (
_starterFn = defaultYARPCStarter

_ service.Module = &YARPCModule{}

// Controller represents a collection of all YARPC configs
// that are stored together to create a shared dispatcher.
// The YARPC team advised it to be a 'singleton' to control
// the lifecycle of all of the in/out bound traffic.
_controller dispatcherController
)

type registerServiceFunc func(module *YARPCModule)
Expand All @@ -91,6 +91,24 @@ type Inbound struct {
HTTP *Address
}

func (i *Inbound) String() string {
if i == nil {
return ""
}

http := "none"
if i.HTTP != nil {
http = strconv.Itoa(i.HTTP.Port)
}

tchannel := "none"
if i.TChannel != nil {
tchannel = strconv.Itoa(i.TChannel.Port)
}

return fmt.Sprintf("Inbound:{HTTP: %s; TChannel: %s}", http, tchannel)
}

// Address is a struct that have a required port for tchannel/http transports.
// TODO(alsam) make it optional
type Address struct {
Expand All @@ -111,7 +129,7 @@ type dispatcherController struct {
startError error

configs []*yarpcConfig
dispatcher *yarpc.Dispatcher
dispatcher yarpc.Dispatcher
}

// Adds the config to the controller
Expand Down Expand Up @@ -155,12 +173,14 @@ func (c *dispatcherController) Start(host service.Host) error {

_dispatcherMu.Lock()
defer _dispatcherMu.Unlock()
if c.dispatcher, err = _dispatcherFn(host, cfg); err != nil {
var d *yarpc.Dispatcher
if d, err = _dispatcherFn(host, cfg); err != nil {
c.startError = err
return
}

c.startError = _starterFn(c.dispatcher)
c.dispatcher = *d
c.startError = _starterFn(&c.dispatcher)
})

return c.startError
Expand Down Expand Up @@ -249,7 +269,24 @@ func newYARPCModule(
module.config.inboundMiddleware = inboundMiddlewareFromCreateInfo(mi)
module.config.onewayInboundMiddleware = onewayInboundMiddlewareFromCreateInfo(mi)

_controller.addConfig(module.config)
// Try to resolve a controller first
// TODO(alsam) use dig options when available, because we can overwrite the controller in case of multiple
// modules registering a controller.
if err := dig.Resolve(&module.controller); err != nil {

// Try to register it then
module.controller = &dispatcherController{}
if errCr := dig.Register(module.controller); errCr != nil {
return nil, errs.Wrap(errCr, "can't register a dispatcher controller")
}

// Register dispatcher
if err := dig.Register(&module.controller.dispatcher); err != nil {
return nil, errs.Wrap(err, "unable to register the dispatcher")
}
}

module.controller.addConfig(module.config)

module.log.Info("Module successfuly created", "inbounds", module.config.Inbounds)

Expand Down Expand Up @@ -294,7 +331,7 @@ func (m *YARPCModule) Start(readyCh chan<- struct{}) <-chan error {
defer m.stateMu.Unlock()

// TODO(alsam) allow services to advertise with a name separate from the host name.
if err := _controller.Start(m.Host()); err != nil {
if err := m.controller.Start(m.Host()); err != nil {
ret <- errs.Wrap(err, "unable to start dispatcher")
return ret
}
Expand All @@ -316,8 +353,9 @@ func (m *YARPCModule) Stop() error {

m.stateMu.Lock()
defer m.stateMu.Unlock()

m.isRunning = false
return _controller.Stop()
return m.controller.Stop()
}

// IsRunning returns whether a module is running
Expand Down Expand Up @@ -354,9 +392,3 @@ func RegisterStarter(startFn StarterFn) {
func defaultYARPCStarter(dispatcher *yarpc.Dispatcher) error {
return dispatcher.Start()
}

// Dispatcher returns a dispatcher that can be used to create clients.
// It should be called after at least one module have been started, otherwise it will be nil.
func Dispatcher() *yarpc.Dispatcher {
return _controller.dispatcher
}
16 changes: 16 additions & 0 deletions modules/rpc/yarpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
package rpc

import (
"fmt"
"testing"

"go.uber.org/fx/service"
Expand Down Expand Up @@ -68,3 +69,18 @@ func TestMergeOfEmptyConfigCollectionReturnsError(t *testing.T) {
assert.EqualError(t, err, "unable to merge empty configs")
assert.EqualError(t, c.Start(service.NopHost()), err.Error())
}

func TestInboundPrint(t *testing.T) {
t.Parallel()
var i *Inbound
assert.Equal(t, "", fmt.Sprint(i))

i = &Inbound{}
assert.Equal(t, "Inbound:{HTTP: none; TChannel: none}", fmt.Sprint(i))
i.HTTP = &Address{8080}
assert.Equal(t, "Inbound:{HTTP: 8080; TChannel: none}", fmt.Sprint(i))
i.TChannel = &Address{9876}
assert.Equal(t, "Inbound:{HTTP: 8080; TChannel: 9876}", fmt.Sprint(i))
i.HTTP = nil
assert.Equal(t, "Inbound:{HTTP: none; TChannel: 9876}", fmt.Sprint(i))
}

0 comments on commit b897b67

Please sign in to comment.