Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 13 additions & 6 deletions routing/http/contentrouter/contentrouter.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func (c *contentRouter) Ready() bool {

// readProviderResponses reads peer records (and bitswap records for legacy
// compatibility) from the iterator into the given channel.
func readProviderResponses(iter iter.ResultIter[types.Record], ch chan<- peer.AddrInfo) {
func readProviderResponses(ctx context.Context, iter iter.ResultIter[types.Record], ch chan<- peer.AddrInfo) {
defer close(ch)
defer iter.Close()
for iter.Next() {
Expand All @@ -140,10 +140,14 @@ func readProviderResponses(iter iter.ResultIter[types.Record], ch chan<- peer.Ad
addrs = append(addrs, a.Multiaddr)
}

ch <- peer.AddrInfo{
select {
case <-ctx.Done():
return
case ch <- peer.AddrInfo{
ID: *result.ID,
Addrs: addrs,
Addrs: addrs}:
}

//lint:ignore SA1019 // ignore staticcheck
case types.SchemaBitswap:
//lint:ignore SA1019 // ignore staticcheck
Expand All @@ -162,9 +166,12 @@ func readProviderResponses(iter iter.ResultIter[types.Record], ch chan<- peer.Ad
addrs = append(addrs, a.Multiaddr)
}

ch <- peer.AddrInfo{
select {
case <-ctx.Done():
return
case ch <- peer.AddrInfo{
ID: *result.ID,
Addrs: addrs,
Addrs: addrs}:
}
}
}
Expand All @@ -179,7 +186,7 @@ func (c *contentRouter) FindProvidersAsync(ctx context.Context, key cid.Cid, num
return ch
}
ch := make(chan peer.AddrInfo)
go readProviderResponses(resultsIter, ch)
go readProviderResponses(ctx, resultsIter, ch)
return ch
}

Expand Down