Skip to content

Commit

Permalink
custom ProviderManager that brokers AddrInfos (#751)
Browse files Browse the repository at this point in the history
- Introduce `ProviderStore` interface to model a source for provider records which includes addresses (not just peer ids). Make adjustments to pre-existing `ProviderManager` implementation to meet the `ProviderStore` interface.
- The DHT option `ProvidersOptions` has been removed. It was used to pass options to the default `ProviderManager`. Going forward, users can create their own instance of `ProviderManager` and pass it in using the new `ProviderStore` DHT option.
  • Loading branch information
petar committed Oct 26, 2021
1 parent 5a8437e commit 7724838
Show file tree
Hide file tree
Showing 9 changed files with 108 additions and 85 deletions.
24 changes: 17 additions & 7 deletions dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,8 @@ type IpfsDHT struct {
datastore ds.Datastore // Local data

routingTable *kb.RoutingTable // Array of routing tables for differently distanced nodes
// ProviderManager stores & manages the provider records for this Dht peer.
ProviderManager *providers.ProviderManager
// providerStore stores & manages the provider records for this Dht peer.
providerStore providers.ProviderStore

// manages Routing Table refresh
rtRefreshManager *rtrefresh.RtRefreshManager
Expand Down Expand Up @@ -221,7 +221,9 @@ func New(ctx context.Context, h host.Host, options ...Option) (*IpfsDHT, error)
}
dht.proc.Go(sn.subscribe)
// handle providers
dht.proc.AddChild(dht.ProviderManager.Process())
if mgr, ok := dht.providerStore.(interface{ Process() goprocess.Process }); ok {
dht.proc.AddChild(mgr.Process())
}

// go-routine to make sure we ALWAYS have RT peer addresses in the peerstore
// since RT membership is decoupled from connectivity
Expand Down Expand Up @@ -338,11 +340,14 @@ func makeDHT(ctx context.Context, h host.Host, cfg dhtcfg.Config) (*IpfsDHT, err
// the DHT context should be done when the process is closed
dht.ctx = goprocessctx.WithProcessClosing(ctxTags, dht.proc)

pm, err := providers.NewProviderManager(dht.ctx, h.ID(), cfg.Datastore, cfg.ProvidersOptions...)
if err != nil {
return nil, err
if cfg.ProviderStore != nil {
dht.providerStore = cfg.ProviderStore
} else {
dht.providerStore, err = providers.NewProviderManager(dht.ctx, h.ID(), dht.peerstore, cfg.Datastore)
if err != nil {
return nil, fmt.Errorf("initializing default provider manager (%v)", err)
}
}
dht.ProviderManager = pm

dht.rtFreezeTimeout = rtFreezeTimeout

Expand Down Expand Up @@ -413,6 +418,11 @@ func makeRoutingTable(dht *IpfsDHT, cfg dhtcfg.Config, maxLastSuccessfulOutbound
return rt, err
}

// ProviderStore returns the provider storage object for storing and retrieving provider records.
func (dht *IpfsDHT) ProviderStore() providers.ProviderStore {
return dht.providerStore
}

// GetRoutingTableDiversityStats returns the diversity stats for the Routing Table.
func (dht *IpfsDHT) GetRoutingTableDiversityStats() []peerdiversity.CplDiversityStats {
return dht.routingTable.GetDiversityStats()
Expand Down
20 changes: 8 additions & 12 deletions dht_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,14 @@ const DefaultPrefix protocol.ID = "/ipfs"

type Option = dhtcfg.Option

// ProviderStore sets the provider storage manager.
func ProviderStore(ps providers.ProviderStore) Option {
return func(c *dhtcfg.Config) error {
c.ProviderStore = ps
return nil
}
}

// RoutingTableLatencyTolerance sets the maximum acceptable latency for peers
// in the routing table's cluster.
func RoutingTableLatencyTolerance(latency time.Duration) Option {
Expand Down Expand Up @@ -236,18 +244,6 @@ func DisableValues() Option {
}
}

// ProvidersOptions are options passed directly to the provider manager.
//
// The provider manager adds and gets provider records from the datastore, cahing
// them in between. These options are passed to the provider manager allowing
// customisation of things like the GC interval and cache implementation.
func ProvidersOptions(opts []providers.Option) Option {
return func(c *dhtcfg.Config) error {
c.ProvidersOptions = opts
return nil
}
}

// QueryFilter sets a function that approves which peers may be dialed in a query
func QueryFilter(filter QueryFilterFunc) Option {
return func(c *dhtcfg.Config) error {
Expand Down
8 changes: 4 additions & 4 deletions dht_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -592,7 +592,7 @@ func TestLocalProvides(t *testing.T) {

for _, c := range testCaseCids {
for i := 0; i < 3; i++ {
provs := dhts[i].ProviderManager.GetProviders(ctx, c.Hash())
provs, _ := dhts[i].ProviderStore().GetProviders(ctx, c.Hash())
if len(provs) > 0 {
t.Fatal("shouldnt know this")
}
Expand Down Expand Up @@ -1285,7 +1285,7 @@ func TestClientModeConnect(t *testing.T) {

c := testCaseCids[0]
p := peer.ID("TestPeer")
a.ProviderManager.AddProvider(ctx, c.Hash(), p)
a.ProviderStore().AddProvider(ctx, c.Hash(), peer.AddrInfo{ID: p})
time.Sleep(time.Millisecond * 5) // just in case...

provs, err := b.FindProviders(ctx, c)
Expand Down Expand Up @@ -1548,7 +1548,7 @@ func TestProvideDisabled(t *testing.T) {
if err != routing.ErrNotSupported {
t.Fatal("get should have failed on node B")
}
provs := dhtB.ProviderManager.GetProviders(ctx, kHash)
provs, _ := dhtB.ProviderStore().GetProviders(ctx, kHash)
if len(provs) != 0 {
t.Fatal("node B should not have found local providers")
}
Expand All @@ -1564,7 +1564,7 @@ func TestProvideDisabled(t *testing.T) {
t.Fatal("node A should not have found providers")
}
}
provAddrs := dhtA.ProviderManager.GetProviders(ctx, kHash)
provAddrs, _ := dhtA.ProviderStore().GetProviders(ctx, kHash)
if len(provAddrs) != 0 {
t.Fatal("node A should not have found local providers")
}
Expand Down
14 changes: 8 additions & 6 deletions fullrt/dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ func NewFullRT(h host.Host, protocolPrefix protocol.ID, options ...Option) (*Ful

ctx, cancel := context.WithCancel(context.Background())

pm, err := providers.NewProviderManager(ctx, h.ID(), dhtcfg.Datastore)
pm, err := providers.NewProviderManager(ctx, h.ID(), h.Peerstore(), dhtcfg.Datastore)
if err != nil {
cancel()
return nil, err
Expand Down Expand Up @@ -762,7 +762,7 @@ func (dht *FullRT) Provide(ctx context.Context, key cid.Cid, brdcst bool) (err e
logger.Debugw("providing", "cid", key, "mh", internal.LoggableProviderRecordBytes(keyMH))

// add self locally
dht.ProviderManager.AddProvider(ctx, keyMH, dht.h.ID())
dht.ProviderManager.AddProvider(ctx, keyMH, peer.AddrInfo{ID: dht.h.ID()})
if !brdcst {
return nil
}
Expand Down Expand Up @@ -1209,13 +1209,15 @@ func (dht *FullRT) findProvidersAsyncRoutine(ctx context.Context, key multihash.
ps = peer.NewLimitedSet(count)
}

provs := dht.ProviderManager.GetProviders(ctx, key)
provs, err := dht.ProviderManager.GetProviders(ctx, key)
if err != nil {
return
}
for _, p := range provs {
// NOTE: Assuming that this list of peers is unique
if ps.TryAdd(p) {
pi := dht.h.Peerstore().PeerInfo(p)
if ps.TryAdd(p.ID) {
select {
case peerOut <- pi:
case peerOut <- p:
case <-ctx.Done():
return
}
Expand Down
17 changes: 5 additions & 12 deletions handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"time"

"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/peerstore"
pstore "github.com/libp2p/go-libp2p-peerstore"

"github.com/gogo/protobuf/proto"
Expand Down Expand Up @@ -318,13 +317,11 @@ func (dht *IpfsDHT) handleGetProviders(ctx context.Context, p peer.ID, pmes *pb.
resp := pb.NewMessage(pmes.GetType(), pmes.GetKey(), pmes.GetClusterLevel())

// setup providers
providers := dht.ProviderManager.GetProviders(ctx, key)

if len(providers) > 0 {
// TODO: pstore.PeerInfos should move to core (=> peerstore.AddrInfos).
infos := pstore.PeerInfos(dht.peerstore, providers)
resp.ProviderPeers = pb.PeerInfosToPBPeers(dht.host.Network(), infos)
providers, err := dht.providerStore.GetProviders(ctx, key)
if err != nil {
return nil, err
}
resp.ProviderPeers = pb.PeerInfosToPBPeers(dht.host.Network(), providers)

// Also send closer peers.
closer := dht.betterPeersToQuery(pmes, p, dht.bucketSize)
Expand Down Expand Up @@ -362,11 +359,7 @@ func (dht *IpfsDHT) handleAddProvider(ctx context.Context, p peer.ID, pmes *pb.M
continue
}

if pi.ID != dht.self { // don't add own addrs.
// add the received addresses to our peerstore.
dht.peerstore.AddAddrs(pi.ID, pi.Addrs, peerstore.ProviderAddrTTL)
}
dht.ProviderManager.AddProvider(ctx, key, p)
dht.providerStore.AddProvider(ctx, key, peer.AddrInfo{ID: p})
}

return nil, nil
Expand Down
2 changes: 1 addition & 1 deletion internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ type Config struct {
MaxRecordAge time.Duration
EnableProviders bool
EnableValues bool
ProvidersOptions []providers.Option
ProviderStore providers.ProviderStore
QueryPeerFilter QueryFilterFunc

RoutingTable struct {
Expand Down
33 changes: 26 additions & 7 deletions providers/providers_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"time"

"github.com/libp2p/go-libp2p-core/peer"
peerstore "github.com/libp2p/go-libp2p-core/peerstore"
peerstoreImpl "github.com/libp2p/go-libp2p-peerstore"

lru "github.com/hashicorp/golang-lru/simplelru"
ds "github.com/ipfs/go-datastore"
Expand All @@ -30,12 +32,20 @@ var lruCacheSize = 256
var batchBufferSize = 256
var log = logging.Logger("providers")

// ProviderStore represents a store that associates peers and their addresses to keys.
type ProviderStore interface {
AddProvider(ctx context.Context, key []byte, prov peer.AddrInfo) error
GetProviders(ctx context.Context, key []byte) ([]peer.AddrInfo, error)
}

// ProviderManager adds and pulls providers out of the datastore,
// caching them in between
type ProviderManager struct {
self peer.ID
// all non channel fields are meant to be accessed only within
// the run method
cache lru.LRUCache
pstore peerstore.Peerstore
dstore *autobatch.Datastore

newprovs chan *addProv
Expand All @@ -45,6 +55,8 @@ type ProviderManager struct {
cleanupInterval time.Duration
}

var _ ProviderStore = (*ProviderManager)(nil)

// Option is a function that sets a provider manager option.
type Option func(*ProviderManager) error

Expand Down Expand Up @@ -86,10 +98,12 @@ type getProv struct {
}

// NewProviderManager constructor
func NewProviderManager(ctx context.Context, local peer.ID, dstore ds.Batching, opts ...Option) (*ProviderManager, error) {
func NewProviderManager(ctx context.Context, local peer.ID, ps peerstore.Peerstore, dstore ds.Batching, opts ...Option) (*ProviderManager, error) {
pm := new(ProviderManager)
pm.self = local
pm.getprovs = make(chan *getProv)
pm.newprovs = make(chan *addProv)
pm.pstore = ps
pm.dstore = autobatch.NewAutoBatching(dstore, batchBufferSize)
cache, err := lru.NewLRU(lruCacheSize, nil)
if err != nil {
Expand Down Expand Up @@ -214,14 +228,19 @@ func (pm *ProviderManager) run(proc goprocess.Process) {
}

// AddProvider adds a provider
func (pm *ProviderManager) AddProvider(ctx context.Context, k []byte, val peer.ID) {
func (pm *ProviderManager) AddProvider(ctx context.Context, k []byte, provInfo peer.AddrInfo) error {
if provInfo.ID != pm.self { // don't add own addrs.
pm.pstore.AddAddrs(provInfo.ID, provInfo.Addrs, peerstore.ProviderAddrTTL)
}
prov := &addProv{
key: k,
val: val,
val: provInfo.ID,
}
select {
case pm.newprovs <- prov:
return nil
case <-ctx.Done():
return ctx.Err()
}
}

Expand Down Expand Up @@ -255,21 +274,21 @@ func mkProvKey(k []byte) string {

// GetProviders returns the set of providers for the given key.
// This method _does not_ copy the set. Do not modify it.
func (pm *ProviderManager) GetProviders(ctx context.Context, k []byte) []peer.ID {
func (pm *ProviderManager) GetProviders(ctx context.Context, k []byte) ([]peer.AddrInfo, error) {
gp := &getProv{
key: k,
resp: make(chan []peer.ID, 1), // buffered to prevent sender from blocking
}
select {
case <-ctx.Done():
return nil
return nil, ctx.Err()
case pm.getprovs <- gp:
}
select {
case <-ctx.Done():
return nil
return nil, ctx.Err()
case peers := <-gp.resp:
return peers
return peerstoreImpl.PeerInfos(pm.pstore, peers), nil
}
}

Expand Down

0 comments on commit 7724838

Please sign in to comment.