Skip to content

Commit

Permalink
Add cmd ACPT, redefine readBufSize
Browse files Browse the repository at this point in the history
Signed-off-by: Sherlock Holo <sherlockya@gmail.com>
  • Loading branch information
Sherlock-Holo committed Feb 28, 2019
1 parent 0e8d16f commit e012fde
Show file tree
Hide file tree
Showing 5 changed files with 53 additions and 53 deletions.
6 changes: 2 additions & 4 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ const (
ClientMode mode = iota
ServerMode

defaultReadBufSize = 64 * 1024
defaultWriteBufSize = 64 * 1024
defaultReadBufSize = 64 * 1024 // can buf data size
// defaultWriteBufSize = 64 * 1024
)

// Config manager config.
Expand All @@ -20,7 +20,6 @@ type Config struct {
DebugLog bool // enable debug log
Mode mode // manager run mode
ReadBufSize int32
WriteBufSize int32
}

// DefaultConfig default config.
Expand All @@ -31,7 +30,6 @@ func DefaultConfig(mode mode) Config {
DebugLog: false,
Mode: mode,
ReadBufSize: defaultReadBufSize,
WriteBufSize: defaultWriteBufSize,
}
}

Expand Down
61 changes: 23 additions & 38 deletions link.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"context"
"encoding/binary"
"io"
"math"
"net"
"strconv"
"sync"
Expand All @@ -14,7 +13,7 @@ import (
)

const (
ackPayloadLength = 2
ackPayloadLength = 4
)

// Link impalement io.ReadWriteCloser.
Expand Down Expand Up @@ -55,9 +54,7 @@ func newLink(id uint32, m *manager, mode mode) *link {

buf: bytes.NewBuffer(make([]byte, 0, m.cfg.ReadBufSize)),

readEvent: make(chan struct{}, 1),

writeWind: m.cfg.WriteBufSize,
readEvent: make(chan struct{}, 1),
writeEvent: make(chan struct{}, 1),
}

Expand Down Expand Up @@ -109,26 +106,30 @@ func (l *link) pushBytes(p []byte) {
// manager calls pushPacket to let link handles that packet.
func (l *link) pushPacket(p *Packet) {
switch p.CMD {
case PSH, NEW:
case PSH:
l.pushBytes(p.Payload)

case ACK:
// dial ok
if l.dialCtx != nil {
l.dialCtxFunc()
}
case NEW:
atomic.StoreInt32(&l.writeWind, int32(binary.BigEndian.Uint32(p.Payload[:4])))
l.pushBytes(p.Payload[4:])

// if ACK packet has error payload, it will be ignored.
case ACK:
/*// if ACK packet has error payload, it will be ignored.
if p.PayloadLength != ackPayloadLength {
return
}
}*/

if atomic.AddInt32(&l.writeWind, int32(binary.BigEndian.Uint16(p.Payload))) > 0 {
atomic.StoreInt32(&l.writeWind, int32(binary.BigEndian.Uint32(p.Payload)))
if atomic.LoadInt32(&l.writeWind) > 0 {
l.writeAvailable()
}

case CLOSE:
l.closeByPeer()

case ACPT:
l.dialCtxFunc()
atomic.StoreInt32(&l.writeWind, int32(binary.BigEndian.Uint32(p.Payload)))
}
}

Expand All @@ -155,7 +156,7 @@ func (l *link) Read(p []byte) (n int, err error) {
case <-l.ctx.Done():
// when link is closed, peer doesn't care about the ack because it won't send any packets again
default:
go l.sendACK(n)
go l.sendACK()
}

return
Expand Down Expand Up @@ -250,35 +251,19 @@ func (l *link) Close() error {

// closeByPeer when link is closed by peer, closeByPeer will be called.
func (l *link) closeByPeer() {
/*select {
case <-l.ctx.Done():
return
default:
l.ctxCloseFunc()
}*/

l.ctxCloseFunc()
l.manager.removeLink(l.ID)
}

// sendACK check if n > 65535, if n > 65535 will send more then 1 ACK packet.
func (l *link) sendACK(n int) {
ack := make([]byte, 2)

if n > math.MaxUint16 {
binary.BigEndian.PutUint16(ack, math.MaxUint16)
if err := l.manager.writePacket(newPacket(l.ID, ACK, ack)); err != nil {
return
}

l.sendACK(n - math.MaxUint16)
return
}

binary.BigEndian.PutUint16(ack, uint16(n))
if err := l.manager.writePacket(newPacket(l.ID, ACK, ack)); err != nil {
return
func (l *link) sendACK() {
buf := make([]byte, ackPayloadLength)
size := atomic.LoadInt32(&l.bufSize)
if size < 0 {
size = 0
}
binary.BigEndian.PutUint32(buf, uint32(size))
l.manager.writePacket(newPacket(l.ID, ACK, buf))
}

func (l *link) LocalAddr() net.Addr {
Expand Down
6 changes: 3 additions & 3 deletions link_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,9 @@ func TestLinkClientToServer(t *testing.T) {
}()

clientCfg := DefaultConfig(ClientMode)
clientCfg.DebugLog = true
// clientCfg.DebugLog = true
serverCfg := DefaultConfig(ServerMode)
serverCfg.DebugLog = true
// serverCfg.DebugLog = true

clientManager := NewManager(client, clientCfg)
serverManager := NewManager(server, serverCfg)
Expand Down Expand Up @@ -88,7 +88,7 @@ func TestLinkClientToServer(t *testing.T) {
}

if string(b) != string(fileB) {
t.Fatalf("data verify failed, receive:\n%s\n want:\n%s", string(b), string(fileB))
t.Fatalf("data verify failed, receive:\n%s\n\n\n\n\nwant:\n%s", string(b), string(fileB))
}
}

Expand Down
21 changes: 18 additions & 3 deletions manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package link

import (
"context"
"encoding/binary"
"log"
"net"
"sync"
Expand Down Expand Up @@ -204,7 +205,7 @@ func (m *manager) readLoop() {
m.acceptQueue <- link
}

case PSH, ACK, CLOSE:
case PSH, ACK, CLOSE, ACPT:
if l, ok := m.links.Load(packet.ID); ok {
l.(*link).pushPacket(packet)
}
Expand Down Expand Up @@ -257,7 +258,14 @@ func (m *manager) DialData(ctx context.Context, b []byte) (Link, error) {
default:
m.links.Store(link.ID, link)

newP := newPacket(link.ID, NEW, b)
// tell readableBufSize
buf := make([]byte, 4+len(b))
binary.BigEndian.PutUint32(buf, uint32(m.cfg.ReadBufSize))

// write optional data
copy(buf[4:], b)

newP := newPacket(link.ID, NEW, buf)
if err := m.writePacket(newP); err != nil {
return nil, err
}
Expand Down Expand Up @@ -287,7 +295,14 @@ func (m *manager) Accept() (Link, error) {
return nil, ErrManagerClosed

case l = <-m.acceptQueue:
ackNewPacket := newPacket(l.ID, ACK, nil)
readableBufSize := make([]byte, 4)
size := atomic.LoadInt32(&l.bufSize)
if size < 0 {
size = 0
}
binary.BigEndian.PutUint32(readableBufSize, uint32(size))

ackNewPacket := newPacket(l.ID, ACPT, readableBufSize)
if err := m.writePacket(ackNewPacket); err != nil {
return nil, err
}
Expand Down
12 changes: 7 additions & 5 deletions packet.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
type Cmd = uint8

const (
Version = 5
Version = 6

VersionLength = 1
IDLength = 4
Expand All @@ -23,8 +23,9 @@ const (
PSH Cmd = 1 << 7
CLOSE Cmd = 1 << 6
PING Cmd = 1 << 5
ACK Cmd = 1 << 4 // 2 bytes data, uint16, the other side has read [uint16] bytes data
NEW Cmd = 1 << 3 // can take some data to reduce dial time
ACK Cmd = 1 << 4 // [4 bytes data readableBufSize] tell peer the readable size
NEW Cmd = 1 << 3 // can take some data to reduce dial time, [4 bytes readableBufSize + optional data]
ACPT Cmd = 1 << 2 // take [4 bytes readableBufSize]
)

// header[VersionLength + IDLength + CMDLength + indefinite length bytes] [payload]
Expand All @@ -39,6 +40,7 @@ type Packet struct {
// PING 0b0010,0000
// ACK 0b0001,0000
// NEW 0b0000,1000
// ACPT 0b0000,0100
// RSV 0b0000,0000
CMD Cmd

Expand All @@ -60,7 +62,7 @@ func newPacket(id uint32, cmd Cmd, payload []byte) *Packet {
}

switch cmd {
case PSH, CLOSE, PING, ACK, NEW:
case PSH, CLOSE, PING, ACK, NEW, ACPT:
packet.CMD = cmd

default:
Expand Down Expand Up @@ -154,7 +156,7 @@ func decodeFrom(r net.Conn) (*Packet, error) {
cmdByte := b[5]

switch cmdByte {
case PSH, CLOSE, PING, ACK, NEW:
case PSH, CLOSE, PING, ACK, NEW, ACPT:
p.CMD = cmdByte

default:
Expand Down

0 comments on commit e012fde

Please sign in to comment.