diff --git a/traversal/operation.go b/traversal/operation.go index 9839fbc53..90abd8807 100644 --- a/traversal/operation.go +++ b/traversal/operation.go @@ -2,6 +2,7 @@ package traversal import ( "context" + "errors" "sync/atomic" "github.com/anacrolix/chansync" @@ -121,20 +122,35 @@ func (op *Operation) Stalled() events.Active { return op.stalled.Active() } +func (op *Operation) addNodeLocked(n types.AddrMaybeId) (err error) { + if _, ok := op.queried[addrString(n.Addr.String())]; ok { + err = errors.New("already queried") + return + } + if !op.input.NodeFilter(n) { + err = errors.New("failed filter") + return + } + op.unqueried = op.unqueried.Add(n) + op.cond.Broadcast() + return nil +} + +// Add an unqueried node returning an error with a reason why the node wasn't added. +func (op *Operation) AddNode(n types.AddrMaybeId) (err error) { + op.mu.Lock() + defer op.mu.Unlock() + return op.addNodeLocked(n) +} + +// Add a bunch of unqueried nodes at once, returning how many were successfully added. func (op *Operation) AddNodes(nodes []types.AddrMaybeId) (added int) { op.mu.Lock() defer op.mu.Unlock() before := op.unqueried.Len() for _, n := range nodes { - if _, ok := op.queried[addrString(n.Addr.String())]; ok { - continue - } - if !op.input.NodeFilter(n) { - continue - } - op.unqueried = op.unqueried.Add(n) + _ = op.addNodeLocked(n) } - op.cond.Broadcast() return op.unqueried.Len() - before }