Skip to content

Commit

Permalink
Use relay.nos.social for relay lookups
Browse files Browse the repository at this point in the history
  • Loading branch information
boreq committed Nov 30, 2023
1 parent 0e60f7f commit 4f0b916
Show file tree
Hide file tree
Showing 7 changed files with 139 additions and 93 deletions.
24 changes: 23 additions & 1 deletion cmd/crossposting-service/di/inject_adapters.go
@@ -1,6 +1,7 @@
package di

import (
"context"
"database/sql"

"github.com/boreq/errors"
Expand All @@ -13,6 +14,7 @@ import (
"github.com/planetary-social/nos-crossposting-service/service/adapters/twitter"
"github.com/planetary-social/nos-crossposting-service/service/app"
"github.com/planetary-social/nos-crossposting-service/service/config"
"github.com/planetary-social/nos-crossposting-service/service/domain"
)

var sqliteAdaptersSet = wire.NewSet(
Expand Down Expand Up @@ -62,7 +64,7 @@ var adaptersSet = wire.NewSet(
wire.Bind(new(app.AccountIDGenerator), new(*adapters.IDGenerator)),

adapters.NewRelaySource,
adapters.NewPurplePages,
newPurplePages,
wire.Bind(new(app.RelaySource), new(*adapters.RelaySource)),

adapters.NewRelayEventDownloader,
Expand Down Expand Up @@ -115,6 +117,26 @@ var mockTxAdaptersSet = wire.NewSet(
wire.Bind(new(app.Publisher), new(*mocks.Publisher)),
)

var purplePagesAddresses = []domain.RelayAddress{
domain.MustNewRelayAddress("wss://purplepag.es"),
domain.MustNewRelayAddress("wss://relay.nos.social"),
}

func newPurplePages(ctx context.Context, logger logging.Logger, metrics app.Metrics) ([]*adapters.CachedPurplePages, error) {
var result []*adapters.CachedPurplePages

for _, address := range purplePagesAddresses {
v, err := adapters.NewPurplePages(ctx, address, logger, metrics)
if err != nil {
return nil, errors.Wrap(err, "error creating purple pages")
}

result = append(result, adapters.NewCachedPurplePages(logger, v))
}

return result, nil
}

func newAdaptersFactoryFn(deps buildTransactionSqliteAdaptersDependencies) sqlite.AdaptersFactoryFn {
return func(db *sql.DB, tx *sql.Tx) (app.Adapters, error) {
return buildTransactionSqliteAdapters(db, tx, deps)
Expand Down
4 changes: 2 additions & 2 deletions cmd/crossposting-service/di/wire_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions service/adapters/prometheus/prometheus.go
Expand Up @@ -124,7 +124,7 @@ func NewPrometheus(logger logging.Logger) (*Prometheus, error) {
Name: "purple_pages_lookups",
Help: "Number of purple pages lookups.",
},
[]string{labelResult, labelErrorDescription},
[]string{labelResult, labelErrorDescription, labelRelayAddress},
)
tweetCreatedCountPerAccountGauge := prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Expand Down Expand Up @@ -257,10 +257,11 @@ func (p *Prometheus) ReportSubscriptionQueueLength(topic string, n int) {
p.subscriptionQueueLengthGauge.With(prometheus.Labels{labelTopic: topic}).Set(float64(n))
}

func (p *Prometheus) ReportPurplePagesLookupResult(err *error) {
func (p *Prometheus) ReportPurplePagesLookupResult(address domain.RelayAddress, err *error) {
labels := prometheus.Labels{
labelResult: labelResultValueSuccess,
labelErrorDescription: "none",
labelRelayAddress: address.String(),
}
if *err != nil {
labels[labelResult] = labelResultValueError
Expand Down
14 changes: 9 additions & 5 deletions service/adapters/purple_pages.go
Expand Up @@ -2,6 +2,7 @@ package adapters

import (
"context"
"fmt"
"sync"
"time"

Expand All @@ -19,8 +20,6 @@ var (
errLookupFoundNoEvents = errors.New("lookup found no events")
)

var purplePagesAddress = domain.MustNewRelayAddress("wss://purplepag.es")

const purplePagesLookupTimeout = 10 * time.Second

const numLookups = 2
Expand All @@ -34,21 +33,22 @@ type PurplePages struct {

func NewPurplePages(
ctx context.Context,
address domain.RelayAddress,
logger logging.Logger,
metrics app.Metrics,
) (*PurplePages, error) {
connection := NewRelayConnection(purplePagesAddress, logger)
connection := NewRelayConnection(address, logger)
go connection.Run(ctx)

return &PurplePages{
logger: logger,
logger: logger.New(fmt.Sprintf("PurplePages(%s)", address.String())),
metrics: metrics,
connection: connection,
}, nil
}

func (p *PurplePages) GetRelays(ctx context.Context, publicKey domain.PublicKey) (result []domain.RelayAddress, err error) {
defer p.metrics.ReportPurplePagesLookupResult(&err)
defer p.metrics.ReportPurplePagesLookupResult(p.connection.Address(), &err)

p.mutex.Lock()
defer p.mutex.Unlock()
Expand Down Expand Up @@ -197,6 +197,10 @@ func (p *PurplePages) getRelaysFromContacts(ctx context.Context, publicKey domai
return nil, errors.New("timeout")
}

func (p *PurplePages) Address() domain.RelayAddress {
return p.connection.Address()
}

type relaysOrError struct {
Err error
Addresses []domain.RelayAddress
Expand Down
91 changes: 91 additions & 0 deletions service/adapters/purple_pages_cache.go
@@ -0,0 +1,91 @@
package adapters

import (
"context"
"fmt"
"sync"
"time"

"github.com/boreq/errors"
"github.com/planetary-social/nos-crossposting-service/internal/logging"
"github.com/planetary-social/nos-crossposting-service/service/domain"
)

type CachedPurplePages struct {
logger logging.Logger
purplePages *PurplePages
cache *RelayAddressCache
}

func NewCachedPurplePages(logger logging.Logger, purplePages *PurplePages) *CachedPurplePages {
return &CachedPurplePages{
logger: logger.New(fmt.Sprintf("CachedPurplePages(%s)", purplePages.Address().String())),
purplePages: purplePages,
cache: NewRelayAddressCache(),
}
}

func (p CachedPurplePages) GetRelays(ctx context.Context, publicKey domain.PublicKey) ([]domain.RelayAddress, error) {
entry, ok := p.cache.Get(publicKey)
if ok {
if time.Since(entry.T) < refreshPurplePagesAfter {
return entry.Addresses, nil
}
}

newRelayAddresses, err := p.getRelaysFromPurplePages(ctx, publicKey)
if err != nil {
return nil, errors.Wrap(err, "error querying purple pages")
}

p.cache.Set(publicKey, newRelayAddresses)
return newRelayAddresses, nil
}

func (p CachedPurplePages) getRelaysFromPurplePages(ctx context.Context, publicKey domain.PublicKey) ([]domain.RelayAddress, error) {
relayAddressesFromPurplePages, err := p.purplePages.GetRelays(ctx, publicKey)
if err != nil {
if errors.Is(err, ErrRelayListNotFoundInPurplePages) {
p.logger.Debug().WithError(err).Message("relay list not found in purple pages")
return nil, nil
}
return nil, errors.Wrap(err, "error querying purple pages")
}
return relayAddressesFromPurplePages, nil
}

func (p CachedPurplePages) Address() domain.RelayAddress {
return p.purplePages.Address()
}

type RelayAddressCache struct {
m map[domain.PublicKey]Entry
lock sync.Mutex
}

func NewRelayAddressCache() *RelayAddressCache {
return &RelayAddressCache{m: make(map[domain.PublicKey]Entry)}
}

func (c *RelayAddressCache) Set(publicKey domain.PublicKey, addresses []domain.RelayAddress) {
c.lock.Lock()
defer c.lock.Unlock()

c.m[publicKey] = Entry{
T: time.Now(),
Addresses: addresses,
}
}

func (c *RelayAddressCache) Get(publicKey domain.PublicKey) (Entry, bool) {
c.lock.Lock()
defer c.lock.Unlock()

v, ok := c.m[publicKey]
return v, ok
}

type Entry struct {
T time.Time
Addresses []domain.RelayAddress
}
92 changes: 10 additions & 82 deletions service/adapters/relay_source.go
Expand Up @@ -2,10 +2,9 @@ package adapters

import (
"context"
"sync"
"fmt"
"time"

"github.com/boreq/errors"
"github.com/planetary-social/nos-crossposting-service/internal"
"github.com/planetary-social/nos-crossposting-service/internal/logging"
"github.com/planetary-social/nos-crossposting-service/service/domain"
Expand All @@ -21,97 +20,26 @@ var hardcodedRelayAddresses = []domain.RelayAddress{

type RelaySource struct {
logger logging.Logger
purplePages *PurplePages
cache *RelayAddressCache
purplePages []*CachedPurplePages
}

func NewRelaySource(logger logging.Logger, purplePages *PurplePages) *RelaySource {
func NewRelaySource(logger logging.Logger, purplePages []*CachedPurplePages) *RelaySource {
return &RelaySource{
logger: logger,
purplePages: purplePages,
cache: NewRelayAddressCache(),
}
}

func (p RelaySource) GetRelays(ctx context.Context, publicKey domain.PublicKey) ([]domain.RelayAddress, error) {
result := internal.NewEmptySet[domain.RelayAddress]()
result := internal.NewSet[domain.RelayAddress](hardcodedRelayAddresses)

for _, relayAddress := range hardcodedRelayAddresses {
result.Put(relayAddress)
}

relayAddressesFromPurplePages, err := p.getRelaysFromPurplePagesOrCache(ctx, publicKey)
if err != nil {
return nil, errors.Wrap(err, "error getting relays from purple pages")
}

for _, relayAddress := range relayAddressesFromPurplePages {
result.Put(relayAddress)
}

return result.List(), nil
}

func (p RelaySource) getRelaysFromPurplePagesOrCache(ctx context.Context, publicKey domain.PublicKey) ([]domain.RelayAddress, error) {
var previousEntries []domain.RelayAddress

entry, ok := p.cache.Get(publicKey)
if ok {
previousEntries = entry.Addresses
if time.Since(entry.T) < refreshPurplePagesAfter {
return previousEntries, nil
}
}

relayAddressesFromPurplePages, err := p.getRelaysFromPurplePages(ctx, publicKey)
if err != nil {
return nil, errors.Wrap(err, "error querying purple pages")
}

p.cache.Set(publicKey, relayAddressesFromPurplePages)
return relayAddressesFromPurplePages, nil
}

func (p RelaySource) getRelaysFromPurplePages(ctx context.Context, publicKey domain.PublicKey) ([]domain.RelayAddress, error) {
relayAddressesFromPurplePages, err := p.purplePages.GetRelays(ctx, publicKey)
if err != nil {
if errors.Is(err, ErrRelayListNotFoundInPurplePages) {
p.logger.Debug().WithError(err).Message("relay list not found in purple pages")
return nil, nil
for _, purplePages := range p.purplePages {
relayAddressesFromPurplePages, err := purplePages.GetRelays(ctx, publicKey)
if err != nil {
return nil, fmt.Errorf("error getting relays from '%s'", purplePages.Address().String())
}
return nil, errors.Wrap(err, "error querying purple pages")
result.PutMany(relayAddressesFromPurplePages)
}
return relayAddressesFromPurplePages, nil
}

type RelayAddressCache struct {
m map[domain.PublicKey]Entry
lock sync.Mutex
}

func NewRelayAddressCache() *RelayAddressCache {
return &RelayAddressCache{m: make(map[domain.PublicKey]Entry)}
}

func (c *RelayAddressCache) Set(publicKey domain.PublicKey, addresses []domain.RelayAddress) {
c.lock.Lock()
defer c.lock.Unlock()

c.m[publicKey] = Entry{
T: time.Now(),
Addresses: addresses,
}
}

func (c *RelayAddressCache) Get(publicKey domain.PublicKey) (Entry, bool) {
c.lock.Lock()
defer c.lock.Unlock()

v, ok := c.m[publicKey]
return v, ok
}

type Entry struct {
T time.Time
Addresses []domain.RelayAddress
return result.List(), nil
}
2 changes: 1 addition & 1 deletion service/app/app.go
Expand Up @@ -142,7 +142,7 @@ type Metrics interface {
ReportCallingTwitterAPIToPostATweet(err error)
ReportCallingTwitterAPIToGetAUser(err error)
ReportSubscriptionQueueLength(topic string, n int)
ReportPurplePagesLookupResult(err *error)
ReportPurplePagesLookupResult(address domain.RelayAddress, err *error)
ReportTweetCreatedCountPerAccount(m map[accounts.AccountID]int)
ReportNumberOfAccounts(count int)
ReportNumberOfLinkedPublicKeys(count int)
Expand Down

0 comments on commit 4f0b916

Please sign in to comment.