Skip to content

Commit 8a5c2d5

Browse files
thinkAfCodGrapeBaBa
authored andcommitted
fix: concurrent map read and writes
1 parent a679321 commit 8a5c2d5

3 files changed

Lines changed: 54 additions & 8 deletions

File tree

p2p/discover/portal_protocol.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -326,7 +326,7 @@ func (p *PortalProtocol) setupUDPListening() error {
326326
func(buf []byte, addr *net.UDPAddr) (int, error) {
327327
p.Log.Info("will send to target data", "ip", addr.IP.To4().String(), "port", addr.Port, "bufLength", len(buf))
328328

329-
if n, ok := p.DiscV5.cachedAddrNode[addr.String()]; ok {
329+
if n, ok := p.DiscV5.GetCachedNode(addr.String()); ok {
330330
//_, err := p.DiscV5.TalkRequestToID(id, addr, string(portalwire.UTPNetwork), buf)
331331
req := &v5wire.TalkRequest{Protocol: string(portalwire.Utp), Message: buf}
332332
p.DiscV5.sendFromAnotherThreadWithNode(n, netip.AddrPortFrom(netutil.IPToAddr(addr.IP), uint16(addr.Port)), req)

p2p/discover/v5_udp.go

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ type UDPv5 struct {
6464
// static fields
6565
conn UDPConn
6666
tab *Table
67-
cachedIds map[enode.ID]*enode.Node
67+
nodeMu sync.Mutex
6868
cachedAddrNode map[string]*enode.Node
6969
netrestrict *netutil.Netlist
7070
priv *ecdsa.PrivateKey
@@ -155,7 +155,6 @@ func newUDPv5(conn UDPConn, ln *enode.LocalNode, cfg Config) (*UDPv5, error) {
155155
// static fields
156156
conn: newMeteredConn(conn),
157157
cachedAddrNode: make(map[string]*enode.Node),
158-
cachedIds: make(map[enode.ID]*enode.Node),
159158
localNode: ln,
160159
db: ln.Database(),
161160
netrestrict: cfg.NetRestrict,
@@ -729,8 +728,7 @@ func (t *UDPv5) send(toID enode.ID, toAddr netip.AddrPort, packet v5wire.Packet,
729728
return nonce, err
730729
}
731730
if c != nil && c.Node != nil {
732-
t.cachedIds[toID] = c.Node
733-
t.cachedAddrNode[toAddr.String()] = c.Node
731+
t.putCache(toAddr.String(), c.Node)
734732
}
735733

736734
_, err = t.conn.WriteToUDPAddrPort(enc, toAddr)
@@ -793,8 +791,7 @@ func (t *UDPv5) handlePacket(rawpacket []byte, fromAddr netip.AddrPort) error {
793791
if fromNode != nil {
794792
// Handshake succeeded, add to table.
795793
t.tab.addInboundNode(fromNode)
796-
t.cachedIds[fromID] = fromNode
797-
t.cachedAddrNode[fromAddr.String()] = fromNode
794+
t.putCache(fromAddr.String(), fromNode)
798795
}
799796
if packet.Kind() != v5wire.WhoareyouPacket {
800797
// WHOAREYOU logged separately to report errors.
@@ -999,3 +996,19 @@ func packNodes(reqid []byte, nodes []*enode.Node) []*v5wire.Nodes {
999996
}
1000997
return resp
1001998
}
999+
1000+
func (t *UDPv5) putCache(addr string, node *enode.Node) {
1001+
t.nodeMu.Lock()
1002+
defer t.nodeMu.Unlock()
1003+
if n, ok := t.cachedAddrNode[addr]; ok {
1004+
t.log.Debug("Update cached node", "old", n.ID(), "new", node.ID())
1005+
}
1006+
t.cachedAddrNode[addr] = node
1007+
}
1008+
1009+
func (t *UDPv5) GetCachedNode(addr string) (*enode.Node, bool) {
1010+
t.nodeMu.Lock()
1011+
defer t.nodeMu.Unlock()
1012+
n, ok := t.cachedAddrNode[addr]
1013+
return n, ok
1014+
}

portalnetwork/history/storage.go

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ const (
3434
deleteSql = "DELETE FROM kvstore WHERE key = (?1);"
3535
containSql = "SELECT 1 FROM kvstore WHERE key = (?1);"
3636
getAllOrderedByDistanceSql = "SELECT key, length(value), xor(key, (?1)) as distance FROM kvstore ORDER BY distance DESC;"
37+
getFarthestDistanceSql = "SELECT key, xor(key, (?1)) as distance FROM kvstore ORDER BY distance DESC Limit 1;"
3738
deleteOutOfRadiusStmt = "DELETE FROM kvstore WHERE greater(xor(key, (?1)), (?2)) = 1"
3839
XorFindFarthestQuery = `SELECT
3940
xor(key, (?1)) as distance
@@ -117,8 +118,8 @@ func NewHistoryStorage(config storage.PortalStorageConfig) (storage.ContentStora
117118
}
118119

119120
err = hs.initStmts()
120-
121121
// Check whether we already have data, and use it to set radius
122+
hs.setRadiusToFarthestDistance()
122123

123124
// necessary to test NetworkName==history because state also initialize HistoryStorage
124125
if strings.ToLower(config.NetworkName) == "history" {
@@ -376,6 +377,38 @@ func (p *ContentStorage) EstimateNewRadius(currentRadius *uint256.Int) (*uint256
376377
return currentRadius, nil
377378
}
378379

380+
func (p *ContentStorage) setRadiusToFarthestDistance() {
381+
rows, err := p.sqliteDB.Query(getFarthestDistanceSql, p.nodeId[:])
382+
if err != nil {
383+
p.log.Error("failed to query farthest distance ", "err", err)
384+
return
385+
}
386+
defer func(rows *sql.Rows) {
387+
if rows != nil {
388+
return
389+
}
390+
err = rows.Close()
391+
if err != nil {
392+
p.log.Error("failed to close rows", "err", err)
393+
}
394+
}(rows)
395+
396+
if rows.Next() {
397+
var contentId []byte
398+
var distance []byte
399+
err = rows.Scan(&contentId, &distance)
400+
if err != nil {
401+
p.log.Error("failed to scan rows for farthest distance", "err", err)
402+
}
403+
dis := uint256.NewInt(0)
404+
err = dis.UnmarshalSSZ(distance)
405+
if err != nil {
406+
p.log.Error("failed to unmarshal ssz for farthest distance", "err", err)
407+
}
408+
p.radius.Store(dis)
409+
}
410+
}
411+
379412
func (p *ContentStorage) deleteContentFraction(fraction float64) (deleteCount int, err error) {
380413
if fraction <= 0 || fraction >= 1 {
381414
return deleteCount, errors.New("fraction should be between 0 and 1")

0 commit comments

Comments
 (0)