@@ -24,6 +24,63 @@ use std::{
2424 } ,
2525} ;
2626
27+ /// An entry in `SessionManager::node_id_index`. Tracks both the slab token
28+ /// of the session and whether that session is the local node's own outbound
29+ /// (`originated = true`) or an inbound from the remote (`originated = false`).
30+ ///
31+ /// We store `originated` here — alongside the token — so that the
32+ /// simultaneous-dial tie-breaker in `update_ingress_node_id` can read it
33+ /// without acquiring the per-session `RwLock<Session>`. Acquiring that lock
34+ /// while holding `node_id_index.write()` would deadlock with any other
35+ /// thread that holds `Session::write()` and is itself trying to call
36+ /// `update_ingress_node_id` (which is a common case in the simultaneous-dial
37+ /// scenario the tie-breaker exists to fix).
38+ #[ derive( Clone , Copy , Debug , PartialEq , Eq ) ]
39+ pub struct IndexEntry {
40+ pub token : usize ,
41+ /// `true` if this session is our outbound to the remote (egress);
42+ /// `false` if it is the remote's outbound to us (ingress).
43+ pub originated : bool ,
44+ }
45+
46+ /// Outcome of `SessionManager::update_ingress_node_id`.
47+ #[ derive( Debug ) ]
48+ pub enum UpdateIngressResult {
49+ Inserted ,
50+ /// Caller should disconnect the old token.
51+ Replaced ( usize ) ,
52+ /// Caller should disconnect the new ingress it was about to
53+ /// register — the existing entry won the simultaneous-dial tie-break.
54+ DropNew ,
55+ }
56+
57+ /// Outcome of the simultaneous-dial tie-breaker for a (existing, new-ingress)
58+ /// pair sharing the same remote `NodeId`.
59+ #[ derive( Debug , PartialEq , Eq ) ]
60+ pub enum SimDialOutcome {
61+ KeepNew ,
62+ KeepExisting ,
63+ }
64+
65+ /// Deterministically resolve a simultaneous dial: keep the connection where
66+ /// the higher-`NodeId` peer is the dialer. Both sides compute the same
67+ /// comparison over the same two `NodeId`s and converge on the same surviving
68+ /// TCP connection.
69+ pub fn simultaneous_dial_outcome (
70+ own_node_id : & NodeId , remote_node_id : & NodeId , existing_originated : bool ,
71+ ) -> SimDialOutcome {
72+ if !existing_originated {
73+ // Existing is also an ingress (an older inbound from the same peer):
74+ // the fresher inbound replaces it.
75+ return SimDialOutcome :: KeepNew ;
76+ }
77+ if own_node_id < remote_node_id {
78+ SimDialOutcome :: KeepNew
79+ } else {
80+ SimDialOutcome :: KeepExisting
81+ }
82+ }
83+
2784/// Session manager maintains all ingress and egress TCP connections in thread
2885/// safe manner.
2986///
@@ -46,8 +103,15 @@ pub struct SessionManager {
46103 max_ingress_sessions : usize ,
47104 cur_ingress_sessions : AtomicUsize ,
48105
49- /// session indices
50- node_id_index : RwLock < HashMap < NodeId , usize > > ,
106+ /// The local node's NodeId, used by `update_ingress_node_id` to
107+ /// compute the simultaneous-dial tie-break.
108+ own_node_id : NodeId ,
109+
110+ /// session indices. Each entry stores the slab token plus whether
111+ /// that session is egress (`originated = true`) or ingress
112+ /// (`originated = false`); the latter is consulted by the
113+ /// simultaneous-dial tie-breaker in `update_ingress_node_id`.
114+ node_id_index : RwLock < HashMap < NodeId , IndexEntry > > ,
51115 ip_limit : RwLock < Box < dyn SessionIpLimit > > ,
52116 tag_index : RwLock < SessionTagIndex > ,
53117 /// pos public key
@@ -59,7 +123,7 @@ impl SessionManager {
59123 /// Create a new instance.
60124 pub fn new (
61125 offset : usize , capacity : usize , max_ingress_sessions : usize ,
62- ip_limit_config : & SessionIpLimitConfig ,
126+ own_node_id : NodeId , ip_limit_config : & SessionIpLimitConfig ,
63127 self_pos_public_key : Option < (
64128 ConsensusPublicKey ,
65129 ConsensusVRFPublicKey ,
@@ -71,6 +135,7 @@ impl SessionManager {
71135 capacity,
72136 max_ingress_sessions,
73137 cur_ingress_sessions : AtomicUsize :: new ( 0 ) ,
138+ own_node_id,
74139 node_id_index : RwLock :: new ( HashMap :: new ( ) ) ,
75140 ip_limit : RwLock :: new ( new_session_ip_limit ( ip_limit_config) ) ,
76141 tag_index : Default :: default ( ) ,
@@ -89,8 +154,8 @@ impl SessionManager {
89154 /// Get the session of specified node id.
90155 pub fn get_by_id ( & self , node_id : & NodeId ) -> Option < Arc < RwLock < Session > > > {
91156 let sessions = self . sessions . read ( ) ;
92- let idx = * self . node_id_index . read ( ) . get ( node_id) ?;
93- sessions. get ( idx ) . cloned ( )
157+ let entry = * self . node_id_index . read ( ) . get ( node_id) ?;
158+ sessions. get ( entry . token ) . cloned ( )
94159 }
95160
96161 /// Get all the sessions in `SessionManager`.
@@ -142,7 +207,7 @@ impl SessionManager {
142207
143208 /// Get the session index by node id.
144209 pub fn get_index_by_id ( & self , id : & NodeId ) -> Option < usize > {
145- self . node_id_index . read ( ) . get ( id) . cloned ( )
210+ self . node_id_index . read ( ) . get ( id) . map ( |entry| entry . token )
146211 }
147212
148213 /// Check if the specified IP address is allowed to create a new session.
@@ -225,7 +290,14 @@ impl SessionManager {
225290
226291 // update on creation succeeded
227292 if let Some ( node_id) = id {
228- node_id_index. insert ( node_id. clone ( ) , index) ;
293+ // egress: id is Some at construction
294+ node_id_index. insert (
295+ node_id. clone ( ) ,
296+ IndexEntry {
297+ token : index,
298+ originated : true ,
299+ } ,
300+ ) ;
229301 }
230302
231303 assert ! ( ip_limit. add( ip) ) ;
@@ -250,8 +322,8 @@ impl SessionManager {
250322 sessions. remove ( session. token ( ) ) ;
251323 if let Some ( node_id) = session. id ( ) {
252324 let mut node_id_index = self . node_id_index . write ( ) ;
253- if let Some ( token ) = node_id_index. get ( node_id) {
254- if * token == session. token ( ) {
325+ if let Some ( entry ) = node_id_index. get ( node_id) {
326+ if entry . token == session. token ( ) {
255327 node_id_index. remove ( node_id) ;
256328 }
257329 }
@@ -271,15 +343,23 @@ impl SessionManager {
271343 debug ! ( "SessionManager.remove: leave" ) ;
272344 }
273345
274- /// Update the node id index for ingress session.
275- /// Return error if the session index does not exist, or the node id already
276- /// in use by other session.
277- /// Return optional to-be-disconnected token if no error happens.
346+ /// Update the node id index for an ingress session whose HELLO has
347+ /// just been processed and the remote `node_id` is now known.
348+ ///
349+ /// Returns:
350+ /// - `Err(_)` if the session at `idx` is no longer in the slab (the session
351+ /// was killed concurrently — caller should drop the new ingress).
352+ /// - `Ok(Inserted)` — no prior entry for this `node_id`; the new ingress is
353+ /// now in the index.
354+ /// - `Ok(Replaced(old_token))` — there was a prior entry; it has been
355+ /// replaced. Caller should disconnect `old_token`.
356+ /// - `Ok(DropNew)` — there was a prior entry that won the simultaneous-dial
357+ /// tie-break; the index is unchanged. Caller should disconnect the new
358+ /// ingress (`idx`) it was about to register.
278359 pub fn update_ingress_node_id (
279360 & self , idx : usize , node_id : & NodeId ,
280- ) -> Result < Option < usize > , String > {
361+ ) -> Result < UpdateIngressResult , String > {
281362 debug ! ( "SessionManager.update_ingress_node_id: enter" ) ;
282- let mut token_to_disconnect = None ;
283363
284364 let sessions = self . sessions . read ( ) ;
285365 let mut node_id_index = self . node_id_index . write ( ) ;
@@ -293,21 +373,42 @@ impl SessionManager {
293373 ) ) ;
294374 }
295375
296- // ensure the node id is unique
297- if let Some ( cur_idx) = node_id_index. get ( node_id) {
298- debug ! ( "SessionManager.update_ingress_node_id: leave on node_id already exists" ) ;
299- if * cur_idx != idx {
300- token_to_disconnect = Some ( * cur_idx) ;
301- } else {
376+ if let Some ( existing) = node_id_index. get ( node_id) . copied ( ) {
377+ if existing. token == idx {
302378 panic ! ( "The same token already exists for the same node!!!" ) ;
303379 }
380+ match simultaneous_dial_outcome (
381+ & self . own_node_id ,
382+ node_id,
383+ existing. originated ,
384+ ) {
385+ SimDialOutcome :: KeepNew => {
386+ node_id_index. insert (
387+ node_id. clone ( ) ,
388+ IndexEntry {
389+ token : idx,
390+ originated : false ,
391+ } ,
392+ ) ;
393+ debug ! ( "SessionManager.update_ingress_node_id: leave (replaced)" ) ;
394+ Ok ( UpdateIngressResult :: Replaced ( existing. token ) )
395+ }
396+ SimDialOutcome :: KeepExisting => {
397+ debug ! ( "SessionManager.update_ingress_node_id: leave (drop new — tie-break lost)" ) ;
398+ Ok ( UpdateIngressResult :: DropNew )
399+ }
400+ }
401+ } else {
402+ node_id_index. insert (
403+ node_id. clone ( ) ,
404+ IndexEntry {
405+ token : idx,
406+ originated : false ,
407+ } ,
408+ ) ;
409+ debug ! ( "SessionManager.update_ingress_node_id: leave (inserted)" ) ;
410+ Ok ( UpdateIngressResult :: Inserted )
304411 }
305-
306- node_id_index. insert ( node_id. clone ( ) , idx) ;
307-
308- debug ! ( "SessionManager.update_ingress_node_id: leave" ) ;
309-
310- Ok ( token_to_disconnect)
311412 }
312413}
313414
@@ -390,7 +491,38 @@ impl SessionTagIndex {
390491
391492#[ cfg( test) ]
392493mod tests {
393- use crate :: session_manager:: SessionTagIndex ;
494+ use crate :: {
495+ node_table:: NodeId ,
496+ session_manager:: {
497+ simultaneous_dial_outcome, SessionTagIndex , SimDialOutcome ,
498+ } ,
499+ } ;
500+
501+ #[ test]
502+ fn simdial_two_nodes_converge_on_one_connection ( ) {
503+ // Both sides of a simultaneous dial compute the same comparison
504+ // over the same two NodeIds and must converge on the same
505+ // surviving TCP connection — exactly one side keeps its existing
506+ // egress, the other side drops its own and accepts the new
507+ // ingress. Run both views of the same (A, B) pair and assert the
508+ // XOR property.
509+ let a = NodeId :: from_low_u64_be ( 1 ) ;
510+ let b = NodeId :: from_low_u64_be ( 2 ) ;
511+
512+ // A's view: existing = A's egress to B (originated=true)
513+ let a_outcome = simultaneous_dial_outcome ( & a, & b, true ) ;
514+ // B's view: existing = B's egress to A (originated=true)
515+ let b_outcome = simultaneous_dial_outcome ( & b, & a, true ) ;
516+
517+ let a_keeps = matches ! ( a_outcome, SimDialOutcome :: KeepExisting ) ;
518+ let b_keeps = matches ! ( b_outcome, SimDialOutcome :: KeepExisting ) ;
519+ assert ! (
520+ a_keeps ^ b_keeps,
521+ "exactly one side must keep its egress; got a={:?} b={:?}" ,
522+ a_outcome,
523+ b_outcome
524+ ) ;
525+ }
394526
395527 #[ test]
396528 fn test_tag_index ( ) {
0 commit comments