@@ -26,7 +26,7 @@ import * as claimsUtils from '../claims/utils';
2626import * as agentPB from '../proto/js/Agent_pb' ;
2727import { GRPCClientAgent } from '../agent' ;
2828import { ForwardProxy , ReverseProxy } from '../network' ;
29- import { Mutex } from 'async-mutex' ;
29+ import { Mutex , MutexInterface } from 'async-mutex' ;
3030import {
3131 CreateDestroyStartStop ,
3232 ready ,
@@ -421,68 +421,78 @@ class NodeManager {
421421 /**
422422 * Treat this node as the client, and attempt to create/retrieve an existing
423423 * undirectional connection to another node (server).
424+ * ObjectMap pattern adapted from:
425+ * https://gist.github.com/CMCDragonkai/f58f08e7eaab0430ed4467ca35527a42
424426 */
425427 @ready ( new nodesErrors . ErrorNodeManagerNotStarted ( ) )
426428 public async getConnectionToNode (
427429 targetNodeId : NodeId ,
428430 ) : Promise < NodeConnection > {
429- const connLock = this . connections . get ( targetNodeId ) ;
430- // If there's already an entry in the map, we have 2 cases:
431- // 1. The connection already exists
432- // 2. The connection is currently being created by another concurrent thread
433- if ( connLock != null ) {
434- // Return the connection if it already exists
435- if ( connLock . connection != null ) {
436- return connLock . connection ;
431+ let connection : NodeConnection | undefined ;
432+ let lock : MutexInterface ;
433+ let connAndLock = this . connections . get ( targetNodeId ) ;
434+ if ( connAndLock != null ) {
435+ ( { connection, lock } = connAndLock ) ;
436+ if ( connection != null ) {
437+ return connection ;
437438 }
438- // Otherwise, it's expected to be currently being created by some other thread
439- // Wait for the lock to release
440439 let release ;
441440 try {
442- release = await connLock . lock . acquire ( ) ;
441+ release = await lock . acquire ( ) ;
442+ ( { connection, lock } = connAndLock ) ;
443+ if ( connection != null ) {
444+ return connection ;
445+ }
446+ connection = await this . establishNodeConnection ( targetNodeId , lock ) ;
447+ connAndLock . connection = connection ;
448+ return connection ;
443449 } finally {
444450 release ( ) ;
445451 }
446- // Once the lock is released, then it's sufficient to recursively call the
447- // function. It will most likely enter the case where we already have an
448- // entry in the map (or, an error occurred, and the entry is removed - in
449- // which case, this thread will create the connection).
450- return await this . getConnectionToNode ( targetNodeId ) ;
451-
452- // Otherwise, we need to create an entry
453452 } else {
454- const lock = new Mutex ( ) ;
455- this . connections . set ( targetNodeId , { lock } ) ;
453+ lock = new Mutex ( ) ;
454+ connAndLock = { lock } ;
455+ this . connections . set ( targetNodeId , connAndLock ) ;
456456 let release ;
457457 try {
458458 release = await lock . acquire ( ) ;
459- const targetAddress = await this . findNode ( targetNodeId ) ;
460- const connection = await NodeConnection . createNodeConnection ( {
461- targetNodeId : targetNodeId ,
462- targetHost : targetAddress . ip ,
463- targetPort : targetAddress . port ,
464- forwardProxy : this . fwdProxy ,
465- keyManager : this . keyManager ,
466- logger : this . logger ,
467- } ) ;
468- await connection . start ( {
469- brokerConnections : this . brokerNodeConnections ,
470- } ) ;
471- // Add it to the map of active connections
472- this . connections . set ( targetNodeId , { connection, lock } ) ;
459+ connection = await this . establishNodeConnection ( targetNodeId , lock ) ;
460+ connAndLock . connection = connection ;
473461 return connection ;
474- } catch ( e ) {
475- // We need to make sure to delete any added lock if we encounter an error
476- // Otherwise, we can enter a state where we have a lock in the map, but
477- // no NodeConnection being created
478- this . connections . delete ( targetNodeId ) ;
479- throw e ;
480462 } finally {
481463 release ( ) ;
482464 }
483465 }
484466 }
485467
468+
469+ /**
470+ * Strictly a helper function for this.getConnectionToNode. Do not call this
471+ * function anywhere else.
472+ * To create a connection to a node, always use getConnectionToNode.
473+ */
474+ @ready ( new nodesErrors . ErrorNodeManagerNotStarted ( ) )
475+ protected async establishNodeConnection (
476+ targetNodeId : NodeId ,
477+ lock : MutexInterface
478+ ) : Promise < NodeConnection > {
479+ const targetAddress = await this . findNode ( targetNodeId ) ;
480+ const connection = await NodeConnection . createNodeConnection ( {
481+ targetNodeId : targetNodeId ,
482+ targetHost : targetAddress . ip ,
483+ targetPort : targetAddress . port ,
484+ forwardProxy : this . fwdProxy ,
485+ keyManager : this . keyManager ,
486+ logger : this . logger ,
487+ } ) ;
488+ await connection . start ( {
489+ brokerConnections : this . brokerNodeConnections ,
490+ } ) ;
491+ // Add it to the map of active connections
492+ this . connections . set ( targetNodeId , { connection, lock } ) ;
493+ return connection ;
494+ }
495+
486496 /**
487497 * Create and start a connection to a broker node. Assumes that a direct
488498 * connection to the broker can be established (i.e. no hole punching required).
0 commit comments