Skip to content

Commit

Permalink
Publish to IPNI in background
Browse files Browse the repository at this point in the history
Don not block graphsync hook while publishing to IPNI. Add logging to
show publication progress.
  • Loading branch information
masih committed Jul 11, 2023
1 parent 5ddf929 commit 91aebc9
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 10 deletions.
6 changes: 5 additions & 1 deletion exchange/fx_exchange.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,11 @@ func (e *FxExchange) Start(ctx context.Context) error {
return err
}
e.gx.RegisterIncomingBlockHook(func(p peer.ID, responseData graphsync.ResponseData, blockData graphsync.BlockData, hookActions graphsync.IncomingBlockHookActions) {
e.pub.notifyReceivedLink(blockData.Link())
go func(link ipld.Link) {
log.Debugw("Notifying link to IPNI publisher...", "link", link)
e.pub.notifyReceivedLink(link)
log.Debugw("Successfully notified link to IPNI publisher", "link", link)
}(blockData.Link())
})
}

Expand Down
35 changes: 26 additions & 9 deletions exchange/ipni_publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package exchange

import (
"context"
"sync/atomic"

"github.com/ipfs/go-cid"
"github.com/ipld/go-ipld-prime"
Expand Down Expand Up @@ -48,23 +49,39 @@ func (p *ipniPublisher) Start(ctx context.Context) error {
}
go func() {
unpublished := make(map[cid.Cid]struct{})
var publishing atomic.Bool
maybePublish := func() {
remaining := len(unpublished)
if remaining > 0 {
mhs := make([]multihash.Multihash, 0, remaining)
for c := range unpublished {
mhs = append(mhs, c.Hash())
delete(unpublished, c)
}
if err := p.publish(mhs); err != nil {
if remaining == 0 {
log.Debug("No remaining entries to publish")
return
}
if publishing.Load() {
log.Debugw("IPNI publishing in progress", "remaining", remaining)
return
}
log.Debugw("Attempting to publish links to IPNI", "count", remaining)
mhs := make([]multihash.Multihash, 0, remaining)
for c := range unpublished {
mhs = append(mhs, c.Hash())
delete(unpublished, c)
}
publishing.Store(true)
go func(entries []multihash.Multihash) {
log.Debug("IPNI publish attempt in progress...")
defer func() {
publishing.Store(false)
log.Debug("Finished attempt to publish to IPNI.")
}()
if err := p.publish(entries); err != nil {
log.Errorw("Failed to publish to IPNI", "entriesCount", len(mhs), "err", err)
}
}
}(mhs)
}
for {
select {
case <-p.ctx.Done():
log.Infow("IPNI publisher stopped")
log.Infow("IPNI publisher stopped", "remainingLinks", len(unpublished))
return
case <-p.ipniPublishTicker.C:
maybePublish()
Expand Down

0 comments on commit 91aebc9

Please sign in to comment.