diff --git a/foundation/bus/bus/message.go b/foundation/bus/bus/message.go index a42982a2..319b6d2d 100644 --- a/foundation/bus/bus/message.go +++ b/foundation/bus/bus/message.go @@ -21,6 +21,16 @@ type MsgFunc func(Message) error // MsgChan is a channel that accepts a message type MsgChan chan Message +// msgOriginChan is a channel that accepts a message with an origin +type msgOriginChan chan messageWithOrigin + +type messageWithOrigin struct { + Message + + // originUUID is the UUID of the pod from which the message originated. + originUUID string +} + // Message represents a message type Message interface { // Unique ID for this message diff --git a/foundation/bus/bus/messagebus.go b/foundation/bus/bus/messagebus.go index 96057198..8de3bb0d 100644 --- a/foundation/bus/bus/messagebus.go +++ b/foundation/bus/bus/messagebus.go @@ -7,7 +7,7 @@ const ( // messageBus is responsible for emitting messages among the connected pods // and managing the failure cases for those pods type messageBus struct { - busChan MsgChan + busChan msgOriginChan pool *connectionPool buffer *MsgBuffer } @@ -15,7 +15,7 @@ type messageBus struct { // newMessageBus creates a new messageBus func newMessageBus() *messageBus { b := &messageBus{ - busChan: make(chan Message, defaultBusChanSize), + busChan: make(chan messageWithOrigin, defaultBusChanSize), pool: newConnectionPool(), buffer: NewMsgBuffer(defaultBufferSize), } @@ -54,7 +54,7 @@ func (b *messageBus) start() { }() } -func (b *messageBus) traverse(msg Message, start *podConnection) { +func (b *messageBus) traverse(msg messageWithOrigin, start *podConnection) { startID := start.ID conn := start diff --git a/foundation/bus/bus/msgbuffer.go b/foundation/bus/bus/msgbuffer.go index 3090b2c4..19bf6383 100644 --- a/foundation/bus/bus/msgbuffer.go +++ b/foundation/bus/bus/msgbuffer.go @@ -12,7 +12,7 @@ const ( // Oldest messages are automatically evicted as new ones are added // past said limit. Push(), Next(), and Iter() are thread-safe. type MsgBuffer struct { - msgs map[string]Message + msgs map[string]messageWithOrigin order []string limit int startIndex int @@ -22,7 +22,7 @@ type MsgBuffer struct { func NewMsgBuffer(limit int) *MsgBuffer { m := &MsgBuffer{ - msgs: map[string]Message{}, + msgs: map[string]messageWithOrigin{}, order: []string{}, limit: limit, startIndex: 0, @@ -34,7 +34,7 @@ func NewMsgBuffer(limit int) *MsgBuffer { } // Push pushes a new message onto the end of the buffer and evicts the oldest, if needed (based on limit) -func (m *MsgBuffer) Push(msg Message) { +func (m *MsgBuffer) Push(msg messageWithOrigin) { m.lock.Lock() defer m.lock.Unlock() @@ -80,7 +80,7 @@ func (m *MsgBuffer) Next() Message { } // Iter calls msgFunc once per message in the buffer -func (m *MsgBuffer) Iter(msgFunc MsgFunc) { +func (m *MsgBuffer) Iter(msgFunc func(messageWithOrigin) error) { m.lock.RLock() defer m.lock.RUnlock() diff --git a/foundation/bus/bus/pod.go b/foundation/bus/bus/pod.go index 673ed534..99fa2f04 100644 --- a/foundation/bus/bus/pod.go +++ b/foundation/bus/bus/pod.go @@ -4,6 +4,8 @@ import ( "errors" "sync" "sync/atomic" + + "github.com/google/uuid" ) const ( @@ -38,12 +40,14 @@ Created with Monodraw // and immediately route a message between its owner and the Bus. The Bus is responsible for any "smarts". // Messages coming from the bus are filtered using the pod's messageFilter, which is configurable by the caller. type Pod struct { + uuid string + onFunc MsgFunc // the onFunc is called whenever a message is received onFuncLock sync.RWMutex - messageChan MsgChan // messageChan is used to receive messages coming from the bus - feedbackChan MsgChan // feedbackChan is used to send "feedback" to the bus about the pod's status - busChan MsgChan // busChan is used to emit messages to the bus + messageChan msgOriginChan // messageChan is used to receive messages coming from the bus + feedbackChan MsgChan // feedbackChan is used to send "feedback" to the bus about the pod's status + busChan msgOriginChan // busChan is used to emit messages to the bus *messageFilter // the embedded messageFilter controls which messages reach the onFunc @@ -60,10 +64,11 @@ type podOpts struct { } // newPod creates a new Pod -func newPod(busChan MsgChan, tunnel func(string, Message) error, opts *podOpts) *Pod { +func newPod(busChan msgOriginChan, tunnel func(string, Message) error, opts *podOpts) *Pod { p := &Pod{ + uuid: uuid.NewString(), onFuncLock: sync.RWMutex{}, - messageChan: make(chan Message, defaultPodChanSize), + messageChan: make(chan messageWithOrigin, defaultPodChanSize), feedbackChan: make(chan Message, defaultPodChanSize), busChan: busChan, messageFilter: newMessageFilter(), @@ -93,7 +98,10 @@ func (p *Pod) Send(msg Message) *MsgReceipt { p.FilterUUID(msg.UUID(), false) // don't allow the same message to bounce back through this pod - p.busChan <- msg + p.busChan <- messageWithOrigin{ + Message: msg, + originUUID: p.uuid, + } t := &MsgReceipt{ UUID: msg.UUID(), @@ -247,7 +255,7 @@ func (p *Pod) setOnFunc(on MsgFunc) { } // busChans returns the messageChan and feedbackChan to be used by the bus -func (p *Pod) busChans() (MsgChan, MsgChan) { +func (p *Pod) busChans() (msgOriginChan, MsgChan) { return p.messageChan, p.feedbackChan } @@ -268,7 +276,7 @@ func (p *Pod) start() { return } - if p.allow(msg) { + if p.allow(msg) && p.uuid != msg.originUUID { if err := p.onFunc(msg); err != nil { // if the onFunc failed, send it back to the bus to be re-sent later p.feedbackChan <- msg diff --git a/foundation/bus/bus/pod_test.go b/foundation/bus/bus/pod_test.go index 9228ad48..0d001d4e 100644 --- a/foundation/bus/bus/pod_test.go +++ b/foundation/bus/bus/pod_test.go @@ -38,6 +38,36 @@ func TestPodFilter(t *testing.T) { } } +func TestPodFilterMessageSentBySelf(t *testing.T) { + g := New() + + counter := testutil.NewAsyncCounter(100) + + onFunc := func(msg Message) error { + counter.Count() + + return nil + } + + p1 := g.Connect() + p1.On(onFunc) + + p2 := g.Connect() + //p2.On(onFunc) + + p1.Send(NewMsg(MsgTypeDefault, []byte("hello p2"))) + // Message sent by p1 should end up with self + if err := counter.Wait(0, 1); err != nil { + t.Error(err) + } + + p2.Send(NewMsg(MsgTypeDefault, []byte("hello p1"))) + // Message sent by p2 should end up with p1 + if err := counter.Wait(1, 1); err != nil { + t.Error(err) + } +} + func TestWaitOn(t *testing.T) { g := New() diff --git a/foundation/bus/bus/podconnection.go b/foundation/bus/bus/podconnection.go index 8646aa2d..5cf8edd0 100644 --- a/foundation/bus/bus/podconnection.go +++ b/foundation/bus/bus/podconnection.go @@ -10,10 +10,10 @@ type podConnection struct { ID int64 next *podConnection - messageChan MsgChan + messageChan msgOriginChan feedbackChan MsgChan - failed []Message + failed []messageWithOrigin lock *sync.RWMutex connected bool @@ -35,7 +35,7 @@ func newPodConnection(id int64, pod *Pod) *podConnection { next: nil, messageChan: msgChan, feedbackChan: feedbackChan, - failed: make([]Message, 0), + failed: make([]messageWithOrigin, 0), lock: &sync.RWMutex{}, connected: true, } @@ -46,7 +46,7 @@ func newPodConnection(id int64, pod *Pod) *podConnection { // send asynchronously writes a message to a connection's messageChan // ordering to the messageChan if it becomes full is not guaranteed, this // is sacrificed to ensure that the bus does not block because of a delinquient pod -func (p *podConnection) send(msg Message) { +func (p *podConnection) send(msg messageWithOrigin) { go func() { p.lock.RLock() defer p.lock.RUnlock() @@ -80,7 +80,7 @@ func (p *podConnection) checkStatus() *connStatus { } else if feedbackMsg == podFeedbackMsgDisconnect { status.WantsDisconnect = true } else { - p.failed = append(p.failed, feedbackMsg) + p.failed = append(p.failed, messageWithOrigin{Message: feedbackMsg}) status.Error = errFailedMessage } default: @@ -113,7 +113,7 @@ func (p *podConnection) flushFailed() { } if len(p.failed) > 0 { - p.failed = make([]Message, 0) + p.failed = make([]messageWithOrigin, 0) } } diff --git a/foundation/bus/bus/pool.go b/foundation/bus/bus/pool.go index b35d38c2..ce30df60 100644 --- a/foundation/bus/bus/pool.go +++ b/foundation/bus/bus/pool.go @@ -112,7 +112,7 @@ func (c *connectionPool) replayNext(buffer *MsgBuffer) { next := c.peek() // iterate over the buffer and send each message to the pod - buffer.Iter(func(msg Message) error { + buffer.Iter(func(msg messageWithOrigin) error { next.send(msg) return nil