Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Port grav#87 to e2core. #410

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
10 changes: 10 additions & 0 deletions foundation/bus/bus/message.go
Expand Up @@ -21,6 +21,16 @@ type MsgFunc func(Message) error
// MsgChan is a channel that accepts a message
type MsgChan chan Message

// msgOriginChan is a channel that accepts a message with an origin
type msgOriginChan chan messageWithOrigin

type messageWithOrigin struct {
Message

// originUUID is the UUID of the pod from which the message originated.
originUUID string
}

// Message represents a message
type Message interface {
// Unique ID for this message
Expand Down
6 changes: 3 additions & 3 deletions foundation/bus/bus/messagebus.go
Expand Up @@ -7,15 +7,15 @@ const (
// messageBus is responsible for emitting messages among the connected pods
// and managing the failure cases for those pods
type messageBus struct {
busChan MsgChan
busChan msgOriginChan
pool *connectionPool
buffer *MsgBuffer
}

// newMessageBus creates a new messageBus
func newMessageBus() *messageBus {
b := &messageBus{
busChan: make(chan Message, defaultBusChanSize),
busChan: make(chan messageWithOrigin, defaultBusChanSize),
pool: newConnectionPool(),
buffer: NewMsgBuffer(defaultBufferSize),
}
Expand Down Expand Up @@ -54,7 +54,7 @@ func (b *messageBus) start() {
}()
}

func (b *messageBus) traverse(msg Message, start *podConnection) {
func (b *messageBus) traverse(msg messageWithOrigin, start *podConnection) {
startID := start.ID
conn := start

Expand Down
8 changes: 4 additions & 4 deletions foundation/bus/bus/msgbuffer.go
Expand Up @@ -12,7 +12,7 @@ const (
// Oldest messages are automatically evicted as new ones are added
// past said limit. Push(), Next(), and Iter() are thread-safe.
type MsgBuffer struct {
msgs map[string]Message
msgs map[string]messageWithOrigin
order []string
limit int
startIndex int
Expand All @@ -22,7 +22,7 @@ type MsgBuffer struct {

func NewMsgBuffer(limit int) *MsgBuffer {
m := &MsgBuffer{
msgs: map[string]Message{},
msgs: map[string]messageWithOrigin{},
order: []string{},
limit: limit,
startIndex: 0,
Expand All @@ -34,7 +34,7 @@ func NewMsgBuffer(limit int) *MsgBuffer {
}

// Push pushes a new message onto the end of the buffer and evicts the oldest, if needed (based on limit)
func (m *MsgBuffer) Push(msg Message) {
func (m *MsgBuffer) Push(msg messageWithOrigin) {
m.lock.Lock()
defer m.lock.Unlock()

Expand Down Expand Up @@ -80,7 +80,7 @@ func (m *MsgBuffer) Next() Message {
}

// Iter calls msgFunc once per message in the buffer
func (m *MsgBuffer) Iter(msgFunc MsgFunc) {
func (m *MsgBuffer) Iter(msgFunc func(messageWithOrigin) error) {
m.lock.RLock()
defer m.lock.RUnlock()

Expand Down
24 changes: 16 additions & 8 deletions foundation/bus/bus/pod.go
Expand Up @@ -4,6 +4,8 @@ import (
"errors"
"sync"
"sync/atomic"

"github.com/google/uuid"
)

const (
Expand Down Expand Up @@ -38,12 +40,14 @@ Created with Monodraw
// and immediately route a message between its owner and the Bus. The Bus is responsible for any "smarts".
// Messages coming from the bus are filtered using the pod's messageFilter, which is configurable by the caller.
type Pod struct {
uuid string

onFunc MsgFunc // the onFunc is called whenever a message is received
onFuncLock sync.RWMutex

messageChan MsgChan // messageChan is used to receive messages coming from the bus
feedbackChan MsgChan // feedbackChan is used to send "feedback" to the bus about the pod's status
busChan MsgChan // busChan is used to emit messages to the bus
messageChan msgOriginChan // messageChan is used to receive messages coming from the bus
feedbackChan MsgChan // feedbackChan is used to send "feedback" to the bus about the pod's status
busChan msgOriginChan // busChan is used to emit messages to the bus

*messageFilter // the embedded messageFilter controls which messages reach the onFunc

Expand All @@ -60,10 +64,11 @@ type podOpts struct {
}

// newPod creates a new Pod
func newPod(busChan MsgChan, tunnel func(string, Message) error, opts *podOpts) *Pod {
func newPod(busChan msgOriginChan, tunnel func(string, Message) error, opts *podOpts) *Pod {
p := &Pod{
uuid: uuid.NewString(),
onFuncLock: sync.RWMutex{},
messageChan: make(chan Message, defaultPodChanSize),
messageChan: make(chan messageWithOrigin, defaultPodChanSize),
feedbackChan: make(chan Message, defaultPodChanSize),
busChan: busChan,
messageFilter: newMessageFilter(),
Expand Down Expand Up @@ -93,7 +98,10 @@ func (p *Pod) Send(msg Message) *MsgReceipt {

p.FilterUUID(msg.UUID(), false) // don't allow the same message to bounce back through this pod

p.busChan <- msg
p.busChan <- messageWithOrigin{
Message: msg,
originUUID: p.uuid,
}

t := &MsgReceipt{
UUID: msg.UUID(),
Expand Down Expand Up @@ -247,7 +255,7 @@ func (p *Pod) setOnFunc(on MsgFunc) {
}

// busChans returns the messageChan and feedbackChan to be used by the bus
func (p *Pod) busChans() (MsgChan, MsgChan) {
func (p *Pod) busChans() (msgOriginChan, MsgChan) {
return p.messageChan, p.feedbackChan
}

Expand All @@ -268,7 +276,7 @@ func (p *Pod) start() {
return
}

if p.allow(msg) {
if p.allow(msg) && p.uuid != msg.originUUID {
if err := p.onFunc(msg); err != nil {
// if the onFunc failed, send it back to the bus to be re-sent later
p.feedbackChan <- msg
Expand Down
30 changes: 30 additions & 0 deletions foundation/bus/bus/pod_test.go
Expand Up @@ -38,6 +38,36 @@ func TestPodFilter(t *testing.T) {
}
}

func TestPodFilterMessageSentBySelf(t *testing.T) {
g := New()

counter := testutil.NewAsyncCounter(100)

onFunc := func(msg Message) error {
counter.Count()

return nil
}

p1 := g.Connect()
p1.On(onFunc)

p2 := g.Connect()
//p2.On(onFunc)

p1.Send(NewMsg(MsgTypeDefault, []byte("hello p2")))
// Message sent by p1 should end up with self
if err := counter.Wait(0, 1); err != nil {
t.Error(err)
}

p2.Send(NewMsg(MsgTypeDefault, []byte("hello p1")))
// Message sent by p2 should end up with p1
if err := counter.Wait(1, 1); err != nil {
t.Error(err)
}
}

func TestWaitOn(t *testing.T) {
g := New()

Expand Down
12 changes: 6 additions & 6 deletions foundation/bus/bus/podconnection.go
Expand Up @@ -10,10 +10,10 @@ type podConnection struct {
ID int64
next *podConnection

messageChan MsgChan
messageChan msgOriginChan
feedbackChan MsgChan

failed []Message
failed []messageWithOrigin

lock *sync.RWMutex
connected bool
Expand All @@ -35,7 +35,7 @@ func newPodConnection(id int64, pod *Pod) *podConnection {
next: nil,
messageChan: msgChan,
feedbackChan: feedbackChan,
failed: make([]Message, 0),
failed: make([]messageWithOrigin, 0),
lock: &sync.RWMutex{},
connected: true,
}
Expand All @@ -46,7 +46,7 @@ func newPodConnection(id int64, pod *Pod) *podConnection {
// send asynchronously writes a message to a connection's messageChan
// ordering to the messageChan if it becomes full is not guaranteed, this
// is sacrificed to ensure that the bus does not block because of a delinquient pod
func (p *podConnection) send(msg Message) {
func (p *podConnection) send(msg messageWithOrigin) {
go func() {
p.lock.RLock()
defer p.lock.RUnlock()
Expand Down Expand Up @@ -80,7 +80,7 @@ func (p *podConnection) checkStatus() *connStatus {
} else if feedbackMsg == podFeedbackMsgDisconnect {
status.WantsDisconnect = true
} else {
p.failed = append(p.failed, feedbackMsg)
p.failed = append(p.failed, messageWithOrigin{Message: feedbackMsg})
status.Error = errFailedMessage
}
default:
Expand Down Expand Up @@ -113,7 +113,7 @@ func (p *podConnection) flushFailed() {
}

if len(p.failed) > 0 {
p.failed = make([]Message, 0)
p.failed = make([]messageWithOrigin, 0)
}
}

Expand Down
2 changes: 1 addition & 1 deletion foundation/bus/bus/pool.go
Expand Up @@ -112,7 +112,7 @@ func (c *connectionPool) replayNext(buffer *MsgBuffer) {
next := c.peek()

// iterate over the buffer and send each message to the pod
buffer.Iter(func(msg Message) error {
buffer.Iter(func(msg messageWithOrigin) error {
next.send(msg)

return nil
Expand Down