Skip to content

Commit

Permalink
Make routing table bucket size configurable (#396)
Browse files Browse the repository at this point in the history
  • Loading branch information
raulk committed Oct 4, 2019
1 parent e216d3c commit fed99af
Show file tree
Hide file tree
Showing 5 changed files with 30 additions and 17 deletions.
10 changes: 7 additions & 3 deletions dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ type IpfsDHT struct {
stripedPutLocks [256]sync.Mutex

protocols []protocol.ID // DHT protocols

bucketSize int
}

// Assert that IPFS assumptions about interfaces aren't broken. These aren't a
Expand All @@ -83,10 +85,11 @@ var (
// New creates a new DHT with the specified host and options.
func New(ctx context.Context, h host.Host, options ...opts.Option) (*IpfsDHT, error) {
var cfg opts.Options
cfg.BucketSize = KValue
if err := cfg.Apply(append([]opts.Option{opts.Defaults}, options...)...); err != nil {
return nil, err
}
dht := makeDHT(ctx, h, cfg.Datastore, cfg.Protocols)
dht := makeDHT(ctx, h, cfg.Datastore, cfg.Protocols, cfg.BucketSize)

// register for network notifs.
dht.host.Network().Notify((*netNotifiee)(dht))
Expand Down Expand Up @@ -131,8 +134,8 @@ func NewDHTClient(ctx context.Context, h host.Host, dstore ds.Batching) *IpfsDHT
return dht
}

func makeDHT(ctx context.Context, h host.Host, dstore ds.Batching, protocols []protocol.ID) *IpfsDHT {
rt := kb.NewRoutingTable(KValue, kb.ConvertPeerID(h.ID()), time.Minute, h.Peerstore())
func makeDHT(ctx context.Context, h host.Host, dstore ds.Batching, protocols []protocol.ID, bucketSize int) *IpfsDHT {
rt := kb.NewRoutingTable(bucketSize, kb.ConvertPeerID(h.ID()), time.Minute, h.Peerstore())

cmgr := h.ConnManager()
rt.PeerAdded = func(p peer.ID) {
Expand All @@ -153,6 +156,7 @@ func makeDHT(ctx context.Context, h host.Host, dstore ds.Batching, protocols []p
birth: time.Now(),
routingTable: rt,
protocols: protocols,
bucketSize: bucketSize,
}

dht.ctx = dht.newContextWithLocalTags(ctx)
Expand Down
9 changes: 3 additions & 6 deletions handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,6 @@ import (
"github.com/whyrusleeping/base32"
)

// The number of closer peers to send on requests.
var CloserPeerCount = KValue

// dhthandler specifies the signature of functions that handle DHT messages.
type dhtHandler func(context.Context, peer.ID, *pb.Message) (*pb.Message, error)

Expand Down Expand Up @@ -69,7 +66,7 @@ func (dht *IpfsDHT) handleGetValue(ctx context.Context, p peer.ID, pmes *pb.Mess
resp.Record = rec

// Find closest peer on given cluster to desired key and reply with that info
closer := dht.betterPeersToQuery(pmes, p, CloserPeerCount)
closer := dht.betterPeersToQuery(pmes, p, dht.bucketSize)
if len(closer) > 0 {
// TODO: pstore.PeerInfos should move to core (=> peerstore.AddrInfos).
closerinfos := pstore.PeerInfos(dht.peerstore, closer)
Expand Down Expand Up @@ -265,7 +262,7 @@ func (dht *IpfsDHT) handleFindPeer(ctx context.Context, p peer.ID, pmes *pb.Mess
if targetPid == dht.self {
closest = []peer.ID{dht.self}
} else {
closest = dht.betterPeersToQuery(pmes, p, CloserPeerCount)
closest = dht.betterPeersToQuery(pmes, p, dht.bucketSize)

// Never tell a peer about itself.
if targetPid != p {
Expand Down Expand Up @@ -343,7 +340,7 @@ func (dht *IpfsDHT) handleGetProviders(ctx context.Context, p peer.ID, pmes *pb.
}

// Also send closer peers.
closer := dht.betterPeersToQuery(pmes, p, CloserPeerCount)
closer := dht.betterPeersToQuery(pmes, p, dht.bucketSize)
if closer != nil {
// TODO: pstore.PeerInfos should move to core (=> peerstore.AddrInfos).
infos := pstore.PeerInfos(dht.peerstore, closer)
Expand Down
7 changes: 4 additions & 3 deletions lookup.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func (dht *IpfsDHT) GetClosestPeers(ctx context.Context, key string) (<-chan pee
return nil, kb.ErrLookupFailure
}

out := make(chan peer.ID, KValue)
out := make(chan peer.ID, dht.bucketSize)

// since the query doesnt actually pass our context down
// we have to hack this here. whyrusleeping isnt a huge fan of goprocess
Expand Down Expand Up @@ -104,8 +104,9 @@ func (dht *IpfsDHT) GetClosestPeers(ctx context.Context, key string) (<-chan pee

if res != nil && res.queriedSet != nil {
sorted := kb.SortClosestPeers(res.queriedSet.Peers(), kb.ConvertKey(key))
if len(sorted) > KValue {
sorted = sorted[:KValue]
l := len(sorted)
if l > dht.bucketSize {
sorted = sorted[:dht.bucketSize]
}

for _, p := range sorted {
Expand Down
19 changes: 15 additions & 4 deletions opts/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,11 @@ var (

// Options is a structure containing all the options that can be used when constructing a DHT.
type Options struct {
Datastore ds.Batching
Validator record.Validator
Client bool
Protocols []protocol.ID
Datastore ds.Batching
Validator record.Validator
Client bool
Protocols []protocol.ID
BucketSize int
}

// Apply applies the given options to this Option
Expand Down Expand Up @@ -107,3 +108,13 @@ func Protocols(protocols ...protocol.ID) Option {
return nil
}
}

// BucketSize configures the bucket size of the routing table.
//
// The default value is 20.
func BucketSize(bucketSize int) Option {
return func(o *Options) error {
o.BucketSize = bucketSize
return nil
}
}
2 changes: 1 addition & 1 deletion routing.go
Original file line number Diff line number Diff line change
Expand Up @@ -474,7 +474,7 @@ func (dht *IpfsDHT) makeProvRecord(skey cid.Cid) (*pb.Message, error) {
// FindProviders searches until the context expires.
func (dht *IpfsDHT) FindProviders(ctx context.Context, c cid.Cid) ([]peer.AddrInfo, error) {
var providers []peer.AddrInfo
for p := range dht.FindProvidersAsync(ctx, c, KValue) {
for p := range dht.FindProvidersAsync(ctx, c, dht.bucketSize) {
providers = append(providers, p)
}
return providers, nil
Expand Down

0 comments on commit fed99af

Please sign in to comment.