Skip to content
This repository has been archived by the owner on May 26, 2022. It is now read-only.

Commit

Permalink
Merge pull request #189 from libp2p/open-stream-context
Browse files Browse the repository at this point in the history
change OpenStream to accept a context
  • Loading branch information
marten-seemann committed Dec 19, 2020
2 parents 3123af3 + 7b60f8c commit 0b4a78f
Show file tree
Hide file tree
Showing 10 changed files with 55 additions and 23 deletions.
5 changes: 4 additions & 1 deletion .github/workflows/interop.yml
Expand Up @@ -60,7 +60,10 @@ jobs:
git reflog --decorate -1
TAGS=()
if [[ `git merge-base --is-ancestor HEAD 126c64772ba0aef0b2b6d58ff36e55a93f9253a7; echo $?` != "1" ]]; then
TAGS+=("oldstream")
TAGS+=("old_stream_close")
fi
if [[ `git merge-base --is-ancestor HEAD 3123af36d6cec13e31dac75058c8046e6e4a6690; echo $?` != "1" ]]; then
TAGS+=("stream_open_no_context")
fi
if [[ "${{ matrix.cfg.retireBugBackwardsCompatiblityMode }}" == "true" ]]; then
TAGS+=("retirebugcompatmode")
Expand Down
2 changes: 1 addition & 1 deletion cmd/client/main.go
Expand Up @@ -49,7 +49,7 @@ func run(raddr string, p string) error {
return err
}
defer conn.Close()
str, err := conn.OpenStream()
str, err := conn.OpenStream(context.Background())
if err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions conn.go
Expand Up @@ -37,8 +37,8 @@ func (c *conn) IsClosed() bool {
}

// OpenStream creates a new stream.
func (c *conn) OpenStream() (mux.MuxedStream, error) {
qstr, err := c.sess.OpenStreamSync(context.Background())
func (c *conn) OpenStream(ctx context.Context) (mux.MuxedStream, error) {
qstr, err := c.sess.OpenStreamSync(ctx)
return &stream{Stream: qstr}, err
}

Expand Down
6 changes: 3 additions & 3 deletions conn_test.go
Expand Up @@ -130,7 +130,7 @@ var _ = Describe("Connection", func() {
Expect(err).ToNot(HaveOccurred())
defer serverConn.Close()

str, err := conn.OpenStream()
str, err := conn.OpenStream(context.Background())
Expect(err).ToNot(HaveOccurred())
_, err = str.Write([]byte("foobar"))
Expect(err).ToNot(HaveOccurred())
Expand Down Expand Up @@ -251,7 +251,7 @@ var _ = Describe("Connection", func() {
for _, c := range []tpt.CapableConn{serverConn1, serverConn2} {
go func(conn tpt.CapableConn) {
defer GinkgoRecover()
str, err := conn.OpenStream()
str, err := conn.OpenStream(context.Background())
Expect(err).ToNot(HaveOccurred())
defer str.Close()
_, err = str.Write(data)
Expand Down Expand Up @@ -315,7 +315,7 @@ var _ = Describe("Connection", func() {
defer GinkgoRecover()
conn, err := ln.Accept()
Expect(err).ToNot(HaveOccurred())
str, err := conn.OpenStream()
str, err := conn.OpenStream(context.Background())
Expect(err).ToNot(HaveOccurred())
str.Write([]byte("foobar"))
}()
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Expand Up @@ -5,7 +5,7 @@ go 1.14
require (
github.com/golang/mock v1.4.4
github.com/ipfs/go-log v1.0.4
github.com/libp2p/go-libp2p-core v0.7.0
github.com/libp2p/go-libp2p-core v0.8.0
github.com/libp2p/go-libp2p-tls v0.1.3
github.com/libp2p/go-netroute v0.1.3
github.com/lucas-clemente/quic-go v0.19.3
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Expand Up @@ -126,8 +126,8 @@ github.com/libp2p/go-buffer-pool v0.0.2/go.mod h1:MvaB6xw5vOrDl8rYZGLFdKAuk/hRoR
github.com/libp2p/go-flow-metrics v0.0.1/go.mod h1:Iv1GH0sG8DtYN3SVJ2eG221wMiNpZxBdp967ls1g+k8=
github.com/libp2p/go-flow-metrics v0.0.3/go.mod h1:HeoSNUrOJVK1jEpDqVEiUOIXqhbnS27omG0uWU5slZs=
github.com/libp2p/go-libp2p-core v0.0.1/go.mod h1:g/VxnTZ/1ygHxH3dKok7Vno1VfpvGcGip57wjTU4fco=
github.com/libp2p/go-libp2p-core v0.7.0 h1:4a0TMjrWNTZlNvcqxZmrMRDi/NQWrhwO2pkTuLSQ/IQ=
github.com/libp2p/go-libp2p-core v0.7.0/go.mod h1:FfewUH/YpvWbEB+ZY9AQRQ4TAD8sJBt/G1rVvhz5XT8=
github.com/libp2p/go-libp2p-core v0.8.0 h1:5K3mT+64qDTKbV3yTdbMCzJ7O6wbNsavAEb8iqBvBcI=
github.com/libp2p/go-libp2p-core v0.8.0/go.mod h1:FfewUH/YpvWbEB+ZY9AQRQ4TAD8sJBt/G1rVvhz5XT8=
github.com/libp2p/go-libp2p-tls v0.1.3 h1:twKMhMu44jQO+HgQK9X8NHO5HkeJu2QbhLzLJpa8oNM=
github.com/libp2p/go-libp2p-tls v0.1.3/go.mod h1:wZfuewxOndz5RTnCAxFliGjvYSDA40sKitV4c50uI1M=
github.com/libp2p/go-maddr-filter v0.1.0/go.mod h1:VzZhTXkMucEGGEOSKddrwGiOv0tUhgnKqNEmIAz/bPU=
Expand Down
14 changes: 14 additions & 0 deletions integrationtests/conn/conn.go
@@ -0,0 +1,14 @@
// +build !stream_open_no_context

package conn

import (
"context"

"github.com/libp2p/go-libp2p-core/mux"
tpt "github.com/libp2p/go-libp2p-core/transport"
)

func OpenStream(ctx context.Context, c tpt.CapableConn) (mux.MuxedStream, error) {
return c.OpenStream(ctx)
}
14 changes: 14 additions & 0 deletions integrationtests/conn/conn_without_stream_open_context.go
@@ -0,0 +1,14 @@
// +build stream_open_no_context

package conn

import (
"context"

"github.com/libp2p/go-libp2p-core/mux"
tpt "github.com/libp2p/go-libp2p-core/transport"
)

func OpenStream(_ context.Context, c tpt.CapableConn) (mux.MuxedStream, error) {
return c.OpenStream()
}
25 changes: 13 additions & 12 deletions integrationtests/main.go
Expand Up @@ -19,6 +19,7 @@ import (
ma "github.com/multiformats/go-multiaddr"
"golang.org/x/sync/errgroup"

"github.com/libp2p/go-libp2p-quic-transport/integrationtests/conn"
"github.com/libp2p/go-libp2p-quic-transport/integrationtests/stream"
)

Expand Down Expand Up @@ -151,18 +152,18 @@ func testSingleFileTransfer(tr transport.Transport, serverKey crypto.PubKey, add
}
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
conn, err := tr.Dial(ctx, addr, serverPeerID)
c, err := tr.Dial(ctx, addr, serverPeerID)
if err != nil {
return fmt.Errorf("Dial failed: %w", err)
}
defer conn.Close()
if !conn.RemotePublicKey().Equals(serverKey) {
defer c.Close()
if !c.RemotePublicKey().Equals(serverKey) {
return errors.New("mismatching public keys")
}
if conn.RemotePeer() != serverPeerID {
return fmt.Errorf("remote Peer ID mismatch. Got %s, expected %s", conn.RemotePeer().Pretty(), serverPeerID.Pretty())
if c.RemotePeer() != serverPeerID {
return fmt.Errorf("remote Peer ID mismatch. Got %s, expected %s", c.RemotePeer().Pretty(), serverPeerID.Pretty())
}
st, err := conn.OpenStream()
st, err := conn.OpenStream(context.Background(), c)
if err != nil {
return err
}
Expand Down Expand Up @@ -192,21 +193,21 @@ func testMultipleFileTransfer(tr transport.Transport, serverKey crypto.PubKey, a
}
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
conn, err := tr.Dial(ctx, addr, serverPeerID)
c, err := tr.Dial(ctx, addr, serverPeerID)
if err != nil {
return fmt.Errorf("Dial failed: %w", err)
}
defer conn.Close()
if !conn.RemotePublicKey().Equals(serverKey) {
defer c.Close()
if !c.RemotePublicKey().Equals(serverKey) {
return errors.New("mismatching public keys")
}
if conn.RemotePeer() != serverPeerID {
return fmt.Errorf("remote Peer ID mismatch. Got %s, expected %s", conn.RemotePeer().Pretty(), serverPeerID.Pretty())
if c.RemotePeer() != serverPeerID {
return fmt.Errorf("remote Peer ID mismatch. Got %s, expected %s", c.RemotePeer().Pretty(), serverPeerID.Pretty())
}
var g errgroup.Group
for i := 0; i < 2000; i++ {
g.Go(func() error {
st, err := conn.OpenStream()
st, err := conn.OpenStream(context.Background(), c)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion integrationtests/stream/stream_old_interface.go
@@ -1,4 +1,4 @@
// +build oldstream
// +build old_stream_close

package stream

Expand Down

0 comments on commit 0b4a78f

Please sign in to comment.