Skip to content

Commit

Permalink
[Feature] UUIDs, protocol versioning, v2 protocol w/ dag-cbor messagi…
Browse files Browse the repository at this point in the history
…ng (#332)

* feat(net): initial dag-cbor protocol support

also added first roundtrip benchmark

* feat(requestid): use uuids for requestids

Ref: ipfs/go-graphsync#278
Closes: ipfs/go-graphsync#279
Closes: ipfs/go-graphsync#281

* fix(requestmanager): make collect test requests with uuids sortable

* fix(requestid): print requestids as string uuids in logs

* fix(requestid): use string as base type for RequestId

* chore(requestid): wrap requestid string in a struct

* feat(libp2p): add v1.0.0 network compatibility

* chore(net): resolve most cbor + uuid merge problems

* feat(net): to/from ipld bindnode types, more cbor protoc improvements

* feat(net): introduce 2.0.0 protocol for dag-cbor

* fix(net): more bindnode dag-cbor protocol fixes

Not quite working yet, still need some upstream fixes and no extensions work
has been attempted yet.

* chore(metadata): convert metadata to bindnode

* chore(net,extensions): wire up IPLD extensions, expose as Node instead of []byte

* Extensions now working with new dag-cbor network protocol
* dag-cbor network protocol still not default, most tests are still exercising
  the existing v1 protocol
* Metadata now using bindnode instead of cbor-gen
* []byte for deferred extensions decoding is now replaced with datamodel.Node
  everywhere. Internal extensions now using some form of go-ipld-prime
	decode to convert them to local types (metadata using bindnode, others using
	direct inspection).
* V1 protocol also using dag-cbor decode of extensions data and exporting the
  bytes - this may be a breaking change for exising extensions - need to check
	whether this should be done differently. Maybe a try-decode and if it fails
	export a wrapped Bytes Node?

* fix(src): fix imports

* fix(mod): clean up go.mod

* fix(net): refactor message version format code to separate packages

* feat(net): activate v2 network as default

* fix(src): build error

* chore: remove GraphSyncMessage#Loggable

Ref: ipfs/go-graphsync#332 (comment)

* chore: remove intermediate v1.1 pb protocol message type

v1.1.0 was introduced to start the transition to UUID RequestIDs. That
change has since been combined with the switch to DAG-CBOR messaging format
for a v2.0.0 protocol. Thus, this interim v1.1.0 format is no longer needed
and has not been used at all in a released version of go-graphsync.

Fixes: filecoin-project/lightning-planning#14

* fix: clarify comments re dag-cbor extension data

As per dission in ipfs/go-graphsync#338, we are going
to be erroring on extension data that is not properly dag-cbor encoded from now
on

* feat: new LinkMetadata iface, integrate metadata into Response type (#342)

* feat(metadata): new LinkMetadata iface, integrate metadata into Response type

* LinkMetadata wrapper around existing metadata type to allow for easier
  backward-compat upgrade path
* integrate metadata directly into GraphSyncResponse type, moving it from an
  optional extension
* still deal with metadata as an extension for now—further work for v2 protocol
  will move it into the core message schema

Ref: ipfs/go-graphsync#335

* feat(metadata): move metadata to core protocol, only use extension in v1 proto

* fix(metadata): bindnode expects Go enum strings to be at the type level

* fix(metadata): minor fixes, tidy up naming

* fix(metadata): make gofmt and staticcheck happy

* fix(metadata): docs and minor tweaks after review

Co-authored-by: Daniel Martí <mvdan@mvdan.cc>

* fix: avoid double-encode for extension size estimation

Closes: filecoin-project/lightning-planning#15

* feat(requesttype): introduce RequestType enum to replace cancel&update bools (#352)

Closes: ipfs/go-graphsync#345

* fix(metadata): extend round-trip tests to byte representation (#350)

* feat!(messagev2): tweak dag-cbor message schema (#354)

* feat!(messagev2): tweak dag-cbor message schema

For:

1. Efficiency: compacting the noisy structures into tuples representations and
   making top-level components of a message optional.
2. Migrations: providing a secondary mechanism to lean on for versioning if we
   want a gentler upgrade path than libp2p protocol versioning.

Closes: ipfs/go-graphsync#351

* fix(messagev2): adjust schema per feedback

* feat(graphsync): unify req & resp Pause, Unpause & Cancel by RequestID (#355)

* feat(graphsync): unify req & resp Pause, Unpause & Cancel by RequestID

Closes: ipfs/go-graphsync#349

* fixup! feat(graphsync): unify req & resp Pause, Unpause & Cancel by RequestID

* fixup! feat(graphsync): unify req & resp Pause, Unpause & Cancel by RequestID

when using error type T, use *T with As, rather than **T

* fixup! feat(graphsync): unify req & resp Pause, Unpause & Cancel by RequestID

* fixup! feat(graphsync): unify req & resp Pause, Unpause & Cancel by RequestID

Co-authored-by: Daniel Martí <mvdan@mvdan.cc>

* feat: SendUpdates() API to send only extension data to via existing request

* fix(responsemanager): send update while completing

If request has finished selector traversal but is still sending blocks,
I think it should be possible to send updates. As a side effect, this
fixes our race.

Logically, this makes sense, cause our external indicator that we're
done (completed response listener) has not been called.

* fix(requestmanager): revert change to pointer type

* Refactor async loading for simplicity and correctness (#356)

* feat(reconciledloader): first working version of reconciled loader

* feat(traversalrecorder): add better recorder for traversals

* feat(reconciledloader): pipe reconciled loader through code

style(lint): fix static checks

* Update requestmanager/reconciledloader/injest.go

Co-authored-by: Rod Vagg <rod@vagg.org>

* feat(reconciledloader): respond to PR comments

Co-authored-by: Rod Vagg <rod@vagg.org>

* fix(requestmanager): update test for rebase

Co-authored-by: Daniel Martí <mvdan@mvdan.cc>
Co-authored-by: hannahhoward <hannah@hannahhoward.net>
  • Loading branch information
3 people committed Feb 18, 2022
1 parent 2d87adb commit 02a1883
Showing 1 changed file with 16 additions and 16 deletions.
32 changes: 16 additions & 16 deletions messagequeue/messagequeue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func TestStartupAndShutdown(t *testing.T) {

messageQueue := New(ctx, peer, messageNetwork, allocator, messageSendRetries, sendMessageTimeout)
messageQueue.Startup()
id := graphsync.RequestID(rand.Int31())
id := graphsync.NewRequestID()
priority := graphsync.Priority(rand.Int31())
ssb := builder.NewSelectorSpecBuilder(basicnode.Prototype.Any)
selector := ssb.Matcher().Node()
Expand Down Expand Up @@ -77,7 +77,7 @@ func TestShutdownDuringMessageSend(t *testing.T) {

messageQueue := New(ctx, peer, messageNetwork, allocator, messageSendRetries, sendMessageTimeout)
messageQueue.Startup()
id := graphsync.RequestID(rand.Int31())
id := graphsync.NewRequestID()
priority := graphsync.Priority(rand.Int31())
ssb := builder.NewSelectorSpecBuilder(basicnode.Prototype.Any)
selector := ssb.Matcher().Node()
Expand Down Expand Up @@ -128,11 +128,11 @@ func TestProcessingNotification(t *testing.T) {
waitGroup.Add(1)
blks := testutil.GenerateBlocksOfSize(3, 128)

responseID := graphsync.RequestID(rand.Int31())
responseID := graphsync.NewRequestID()
extensionName := graphsync.ExtensionName("graphsync/awesome")
extension := graphsync.ExtensionData{
Name: extensionName,
Data: testutil.RandomBytes(100),
Data: basicnode.NewBytes(testutil.RandomBytes(100)),
}
status := graphsync.RequestCompletedFull
blkData := testutil.NewFakeBlockData()
Expand Down Expand Up @@ -199,7 +199,7 @@ func TestDedupingMessages(t *testing.T) {
messageQueue := New(ctx, peer, messageNetwork, allocator, messageSendRetries, sendMessageTimeout)
messageQueue.Startup()
waitGroup.Add(1)
id := graphsync.RequestID(rand.Int31())
id := graphsync.NewRequestID()
priority := graphsync.Priority(rand.Int31())
ssb := builder.NewSelectorSpecBuilder(basicnode.Prototype.Any)
selector := ssb.Matcher().Node()
Expand All @@ -210,11 +210,11 @@ func TestDedupingMessages(t *testing.T) {
})
// wait for send attempt
waitGroup.Wait()
id2 := graphsync.RequestID(rand.Int31())
id2 := graphsync.NewRequestID()
priority2 := graphsync.Priority(rand.Int31())
selector2 := ssb.ExploreAll(ssb.Matcher()).Node()
root2 := testutil.GenerateCids(1)[0]
id3 := graphsync.RequestID(rand.Int31())
id3 := graphsync.NewRequestID()
priority3 := graphsync.Priority(rand.Int31())
selector3 := ssb.ExploreIndex(0, ssb.Matcher()).Node()
root3 := testutil.GenerateCids(1)[0]
Expand All @@ -231,7 +231,7 @@ func TestDedupingMessages(t *testing.T) {
require.Len(t, requests, 1, "number of requests in first message was not 1")
request := requests[0]
require.Equal(t, id, request.ID())
require.False(t, request.IsCancel())
require.Equal(t, request.Type(), graphsync.RequestTypeNew)
require.Equal(t, priority, request.Priority())
require.Equal(t, selector, request.Selector())

Expand All @@ -241,11 +241,11 @@ func TestDedupingMessages(t *testing.T) {
require.Len(t, requests, 2, "number of requests in second message was not 2")
for _, request := range requests {
if request.ID() == id2 {
require.False(t, request.IsCancel())
require.Equal(t, request.Type(), graphsync.RequestTypeNew)
require.Equal(t, priority2, request.Priority())
require.Equal(t, selector2, request.Selector())
} else if request.ID() == id3 {
require.False(t, request.IsCancel())
require.Equal(t, request.Type(), graphsync.RequestTypeNew)
require.Equal(t, priority3, request.Priority())
require.Equal(t, selector3, request.Selector())
} else {
Expand Down Expand Up @@ -385,8 +385,8 @@ func TestNetworkErrorClearResponses(t *testing.T) {
messagesSent := make(chan gsmsg.GraphSyncMessage)
resetChan := make(chan struct{}, 1)
fullClosedChan := make(chan struct{}, 1)
requestID1 := graphsync.RequestID(rand.Int31())
requestID2 := graphsync.RequestID(rand.Int31())
requestID1 := graphsync.NewRequestID()
requestID2 := graphsync.NewRequestID()
messageSender := &fakeMessageSender{nil, fullClosedChan, resetChan, messagesSent}
var waitGroup sync.WaitGroup
messageNetwork := &fakeMessageNetwork{nil, nil, messageSender, &waitGroup}
Expand All @@ -403,7 +403,7 @@ func TestNetworkErrorClearResponses(t *testing.T) {

messageQueue.AllocateAndBuildMessage(uint64(len(blks[0].RawData())), func(b *Builder) {
b.AddBlock(blks[0])
b.AddLink(requestID1, cidlink.Link{Cid: blks[0].Cid()}, true)
b.AddLink(requestID1, cidlink.Link{Cid: blks[0].Cid()}, graphsync.LinkActionPresent)
b.SetSubscriber(requestID1, subscriber)
})
waitGroup.Wait()
Expand Down Expand Up @@ -431,16 +431,16 @@ func TestNetworkErrorClearResponses(t *testing.T) {
messageQueue.AllocateAndBuildMessage(uint64(len(blks[1].RawData())), func(b *Builder) {
b.AddBlock(blks[1])
b.SetResponseStream(requestID1, fc1)
b.AddLink(requestID1, cidlink.Link{Cid: blks[1].Cid()}, true)
b.AddLink(requestID1, cidlink.Link{Cid: blks[1].Cid()}, graphsync.LinkActionPresent)
})
messageQueue.AllocateAndBuildMessage(uint64(len(blks[2].RawData())), func(b *Builder) {
b.AddBlock(blks[2])
b.SetResponseStream(requestID1, fc1)
b.AddLink(requestID1, cidlink.Link{Cid: blks[2].Cid()}, true)
b.AddLink(requestID1, cidlink.Link{Cid: blks[2].Cid()}, graphsync.LinkActionPresent)
})
messageQueue.AllocateAndBuildMessage(uint64(len(blks[3].RawData())), func(b *Builder) {
b.SetResponseStream(requestID2, fc2)
b.AddLink(requestID2, cidlink.Link{Cid: blks[3].Cid()}, true)
b.AddLink(requestID2, cidlink.Link{Cid: blks[3].Cid()}, graphsync.LinkActionPresent)
b.AddBlock(blks[3])
})

Expand Down

0 comments on commit 02a1883

Please sign in to comment.