Skip to content

Commit

Permalink
Merge pull request #176 from tdakkota/feature/pool-and-uploader
Browse files Browse the repository at this point in the history
Add migration test, refactor connection creation, some minor changes
  • Loading branch information
ernado committed Mar 8, 2021
2 parents 2d04c12 + 3c14ce1 commit 761c219
Show file tree
Hide file tree
Showing 66 changed files with 1,417 additions and 467 deletions.
3 changes: 2 additions & 1 deletion cmd/gotdecho/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ func run(ctx context.Context) error {
return nil
}

return sender.Reply(ctx, u).Text(ctx, m.Message)
_, err := sender.Reply(ctx, u).Text(ctx, m.Message)
return err
})
return nil
}, telegram.RunUntilCanceled)
Expand Down
14 changes: 4 additions & 10 deletions internal/pool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,11 +100,8 @@ func (c *DC) dead(r *poolConn, deadErr error) {
return // Already deleted.
}

c.stuck.Reset()
c.mu.Lock()
defer c.mu.Unlock()
r.dead.Signal()

c.total--
remaining := c.total
if remaining < 0 {
Expand All @@ -127,6 +124,9 @@ func (c *DC) dead(r *poolConn, deadErr error) {
c.free = c.free[:len(c.free)-1]
}

r.dead.Signal()
c.stuck.Signal()

c.log.Debug("Connection died",
zap.Int64("remaining", remaining),
zap.Int64("conn_id", r.id),
Expand Down Expand Up @@ -216,6 +216,7 @@ retry:
)
return conn, nil
case <-c.stuck.Ready():
c.stuck.Reset()
c.log.Debug("Some connection dead, try to create new connection, cancel waiting")

c.freeReq.delete(key)
Expand Down Expand Up @@ -267,13 +268,6 @@ func (c *DC) InvokeRaw(ctx context.Context, input bin.Encoder, output bin.Decode

c.log.Debug("DC Invoke")
err = conn.InvokeRaw(ctx, input, output)
if err != nil {
select {
case <-conn.Dead():
continue
default:
}
}
c.release(conn)
if err != nil {
c.log.Debug("DC Invoke failed", zap.Error(err))
Expand Down
250 changes: 250 additions & 0 deletions internal/pool/pool_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,250 @@
package pool

import (
"context"
"fmt"
"math/rand"
"runtime"
"strings"
"sync"
"testing"
"time"

"github.com/stretchr/testify/require"
"go.uber.org/zap/zaptest"

"github.com/gotd/td/bin"
"github.com/gotd/td/internal/tdsync"
)

type invokerFunc func(ctx context.Context, input bin.Encoder, output bin.Decoder) error

type mockConn struct {
ready *tdsync.Ready
stop *tdsync.Ready
locker *sync.RWMutex
invoke invokerFunc
}

func newMockConn(invoke invokerFunc) mockConn {
return mockConn{
ready: tdsync.NewReady(),
stop: tdsync.NewReady(),
locker: new(sync.RWMutex),
invoke: invoke,
}
}

func (m mockConn) Run(ctx context.Context) error {
m.ready.Signal()
select {
case <-m.stop.Ready():
case <-ctx.Done():
return ctx.Err()
}
return nil
}

func (m mockConn) InvokeRaw(ctx context.Context, input bin.Encoder, output bin.Decoder) error {
m.locker.RLock()
defer m.locker.RUnlock()
return m.invoke(ctx, input, output)
}

func (m mockConn) Ready() <-chan struct{} {
return m.ready.Ready()
}

func (m mockConn) lock() sync.Locker {
m.locker.Lock()
return m.locker
}

func (m mockConn) kill() {
m.stop.Signal()
}

type connBuilder struct {
conns []mockConn
lockers map[int]sync.Locker
mux sync.Mutex

invoke invokerFunc
ctx context.Context
}

func newConnBuilder(ctx context.Context, invoke invokerFunc) *connBuilder {
return &connBuilder{invoke: invoke, ctx: ctx, lockers: map[int]sync.Locker{}}
}

func (c *connBuilder) create() Conn {
c.mux.Lock()
defer c.mux.Unlock()

i := len(c.conns)
c.conns = append(c.conns, newMockConn(c.invoke))
return c.conns[i]
}

func (c *connBuilder) lockOne() {
c.mux.Lock()

if len(c.conns) == 0 {
c.mux.Unlock()
loop:
for {
select {
case <-c.ctx.Done():
panic(c.ctx.Err())
default:
c.mux.Lock()
if len(c.conns) != 0 {
break loop
}
c.mux.Unlock()

runtime.Gosched()
}
}
}

defer c.mux.Unlock()
var n int
for {
n = rand.Intn(len(c.conns))
_, ok := c.lockers[n]
if !ok {
break
}
}

locker := c.conns[n].lock()
c.lockers[n] = locker
}

func (c *connBuilder) unlockOne() {
c.mux.Lock()
defer c.mux.Unlock()

var idx int
var locker sync.Locker
for idx, locker = range c.lockers {
break
}

if locker == nil {
panic("no lockers")
}

delete(c.lockers, idx)
locker.Unlock()
}

func (c *connBuilder) killOne() {
c.mux.Lock()
defer c.mux.Unlock()

if len(c.conns) == 0 {
return
}

n := rand.Intn(len(c.conns))
c.conns[n].kill()

// Delete from SliceTricks.
copy(c.conns[n:], c.conns[n+1:])
c.conns[len(c.conns)-1] = mockConn{}
c.conns = c.conns[:len(c.conns)-1]
}

type script []byte

func runScript(open int64, s script) func(t *testing.T) {
return func(t *testing.T) {
a := require.New(t)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
log := zaptest.NewLogger(t)

b := newConnBuilder(ctx, func(ctx context.Context, input bin.Encoder, output bin.Decoder) error {
return nil
})
dc := NewDC(ctx, 2, b.create, DCOptions{
Logger: log.Named("dc"),
MaxOpenConnections: open,
})
defer dc.Close(ctx)

wg := tdsync.NewCancellableGroup(ctx)
for _, action := range s {
switch action {
case 'i':
a.NoError(dc.InvokeRaw(ctx, nil, nil))
case 'a':
wg.Go(func(ctx context.Context) error {
return dc.InvokeRaw(ctx, nil, nil)
})
case 'k':
b.killOne()
case 'l':
b.lockOne()
case 'u':
b.unlockOne()
default:
t.Fatalf("Invalid action %c", action)
}
}
a.NoError(wg.Wait())
}
}

func testAllScenario(open int64) func(t *testing.T) {
return func(t *testing.T) {
scripts := []struct {
name string
code string
minConn int64
}{
{"", "iki", 0},
{"", "ikii", 0},
{"", "ilaaui", 2},
{"", "alaaui", 2},
{"", "ilkaaui", 2},
{"", "ilkaui", 2},
}

for _, sc := range scripts {
if open != 0 && open < sc.minConn {
continue
}

var s strings.Builder
if sc.name == "" {
for _, action := range []byte(sc.code) {
switch action {
case 'i':
s.WriteString("Call")
case 'a':
s.WriteString("Async")
case 'k':
s.WriteString("Kill")
case 'l':
s.WriteString("Lock")
case 'u':
s.WriteString("Unlock")
default:
t.Fatalf("Invalid action %c", action)
}
}
}
t.Run(s.String(), runScript(open, []byte(sc.code)))
}
}
}

func TestDC(t *testing.T) {
limits := []int64{0, 1, 2, 4}
for _, limit := range limits {
t.Run(fmt.Sprintf("Conns%d", limit), testAllScenario(limit))
}
}
17 changes: 17 additions & 0 deletions telegram/cfg.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package telegram

import (
"context"

"go.uber.org/zap"
)

func (c *Client) fetchConfig(ctx context.Context) {
cfg, err := c.tg.HelpGetConfig(ctx)
if err != nil {
c.log.Warn("Got error on config update", zap.Error(err))
return
}

c.cfg.Store(*cfg)
}
27 changes: 27 additions & 0 deletions telegram/cfg_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package telegram

import (
"testing"

"github.com/stretchr/testify/require"

"github.com/gotd/td/bin"
"github.com/gotd/td/tg"
)

func TestClient_fetchConfig(t *testing.T) {
a := require.New(t)
cfg := &tg.Config{
ThisDC: 10,
}
client := newTestClient(func(id int64, body bin.Encoder) (bin.Encoder, error) {
a.IsType(&tg.HelpGetConfigRequest{}, body)
return cfg, nil
})

a.NoError(client.processUpdates(&tg.Updates{
Updates: []tg.UpdateClass{&tg.UpdateConfig{}},
}))

a.Equal(*cfg, client.Config())
}

0 comments on commit 761c219

Please sign in to comment.