4747import java .util .concurrent .CopyOnWriteArraySet ;
4848import java .util .concurrent .atomic .AtomicBoolean ;
4949import java .util .concurrent .atomic .AtomicReference ;
50+ import java .util .concurrent .locks .ReentrantReadWriteLock ;
5051
5152public class PipeConsensusReceiverAgent implements ConsensusPipeReceiver {
5253
@@ -71,7 +72,8 @@ public class PipeConsensusReceiverAgent implements ConsensusPipeReceiver {
7172 ConsensusGroupId , Map <ConsensusPipeName , AtomicReference <PipeConsensusReceiver >>>
7273 replicaReceiverMap = new ConcurrentHashMap <>();
7374
74- private final Set <ConsensusPipeName > aliveReceivers = new CopyOnWriteArraySet <>();
75+ private final ReentrantReadWriteLock receiverLifeCircleLock = new ReentrantReadWriteLock ();
76+ private final Set <Integer > staleRegions = new CopyOnWriteArraySet <>();
7577
7678 private PipeConsensus pipeConsensus ;
7779
@@ -131,43 +133,49 @@ public TPipeConsensusTransferResp receive(TPipeConsensusTransferReq req) {
131133
132134 private PipeConsensusReceiver getReceiver (
133135 ConsensusGroupId consensusGroupId , int leaderDataNodeId , byte reqVersion ) {
134- // 1. Route to given consensusGroup's receiver map
135- Map <ConsensusPipeName , AtomicReference <PipeConsensusReceiver >> consensusPipe2ReceiverMap =
136- replicaReceiverMap .computeIfAbsent (consensusGroupId , key -> new ConcurrentHashMap <>());
137- // 2. Route to given consensusPipeTask's receiver
138- ConsensusPipeName consensusPipeName =
139- new ConsensusPipeName (consensusGroupId , leaderDataNodeId , thisNodeId );
140- AtomicBoolean isFirstGetReceiver = new AtomicBoolean (false );
141- AtomicReference <PipeConsensusReceiver > receiverReference =
142- consensusPipe2ReceiverMap .computeIfAbsent (
143- consensusPipeName ,
144- key -> {
145- isFirstGetReceiver .set (true );
146- return new AtomicReference <>(null );
147- });
148-
149- // 3. If not first get receiver && receiver is not alive, return null.
150- if (!isFirstGetReceiver .get () && !aliveReceivers .contains (consensusPipeName )) {
151- return null ;
152- }
136+ // Try to not block concurrent execution of receive() while ensuring sequential execution of
137+ // creating receiver and releasing receiver by using writeReadLock.
138+ receiverLifeCircleLock .readLock ().lock ();
139+ try {
140+ // If data region is stale, return null.
141+ if (staleRegions .contains (consensusGroupId .getId ())) {
142+ return null ;
143+ }
144+ // 1. Route to given consensusGroup's receiver map
145+ Map <ConsensusPipeName , AtomicReference <PipeConsensusReceiver >> consensusPipe2ReceiverMap =
146+ replicaReceiverMap .computeIfAbsent (consensusGroupId , key -> new ConcurrentHashMap <>());
147+ // 2. Route to given consensusPipeTask's receiver
148+ ConsensusPipeName consensusPipeName =
149+ new ConsensusPipeName (consensusGroupId , leaderDataNodeId , thisNodeId );
150+ AtomicBoolean isFirstGetReceiver = new AtomicBoolean (false );
151+ AtomicReference <PipeConsensusReceiver > receiverReference =
152+ consensusPipe2ReceiverMap .computeIfAbsent (
153+ consensusPipeName ,
154+ key -> {
155+ isFirstGetReceiver .set (true );
156+ return new AtomicReference <>(null );
157+ });
153158
154- if (receiverReference .get () == null ) {
155- return internalSetAndGetReceiver (
156- consensusGroupId , consensusPipeName , reqVersion , isFirstGetReceiver );
157- }
159+ if (receiverReference .get () == null ) {
160+ return internalSetAndGetReceiver (
161+ consensusGroupId , consensusPipeName , reqVersion , isFirstGetReceiver );
162+ }
158163
159- final byte receiverThreadLocalVersion = receiverReference .get ().getVersion ().getVersion ();
160- if (receiverThreadLocalVersion != reqVersion ) {
161- LOGGER .warn (
162- "The pipeConsensus request version {} is different from the sender request version {},"
163- + " the receiver will be reset to the sender request version." ,
164- receiverThreadLocalVersion ,
165- reqVersion );
166- receiverReference .set (null );
167- return internalSetAndGetReceiver (
168- consensusGroupId , consensusPipeName , reqVersion , isFirstGetReceiver );
164+ final byte receiverThreadLocalVersion = receiverReference .get ().getVersion ().getVersion ();
165+ if (receiverThreadLocalVersion != reqVersion ) {
166+ LOGGER .warn (
167+ "The pipeConsensus request version {} is different from the sender request version {},"
168+ + " the receiver will be reset to the sender request version." ,
169+ receiverThreadLocalVersion ,
170+ reqVersion );
171+ receiverReference .set (null );
172+ return internalSetAndGetReceiver (
173+ consensusGroupId , consensusPipeName , reqVersion , isFirstGetReceiver );
174+ }
175+ return receiverReference .get ();
176+ } finally {
177+ receiverLifeCircleLock .readLock ().unlock ();
169178 }
170- return receiverReference .get ();
171179 }
172180
173181 private PipeConsensusReceiver internalSetAndGetReceiver (
@@ -184,8 +192,6 @@ private PipeConsensusReceiver internalSetAndGetReceiver(
184192
185193 if (isFirstGetReceiver .get ()) {
186194 if (RECEIVER_CONSTRUCTORS .containsKey (reqVersion )) {
187- // If is first get receiver, set this receiver as alive.
188- aliveReceivers .add (consensusPipeName );
189195 receiverReference .set (
190196 RECEIVER_CONSTRUCTORS
191197 .get (reqVersion )
@@ -215,34 +221,40 @@ private void waitUntilReceiverGetInitiated(
215221 }
216222 }
217223
218- /** Release all receivers of given data region */
224+ /** Release all receivers of given data region when this region is deleted */
219225 @ Override
220226 public final void releaseReceiverResource (DataRegionId dataRegionId ) {
221- // 1. Route to given consensusGroup's receiver map
222- Map <ConsensusPipeName , AtomicReference <PipeConsensusReceiver >> consensusPipe2ReciverMap =
223- this .replicaReceiverMap .getOrDefault (
224- ConsensusGroupId .Factory .create (
225- TConsensusGroupType .DataRegion .getValue (), dataRegionId .getId ()),
226- new ConcurrentHashMap <>());
227- // 2. Release all related receivers
228- consensusPipe2ReciverMap .entrySet ().stream ()
229- .filter (entry -> entry .getKey ().getReceiverDataNodeId () == thisNodeId )
230- .forEach (
231- receiverEntry -> {
232- ConsensusPipeName consensusPipeName = receiverEntry .getKey ();
233- AtomicReference <PipeConsensusReceiver > receiverReference = receiverEntry .getValue ();
234- if (receiverReference != null ) {
235- // no longer receive new request
236- aliveReceivers .remove (consensusPipeName );
237- receiverReference .get ().handleExit ();
238- receiverReference .set (null );
239- }
240- });
241- // 3. Release replica map
242- this .replicaReceiverMap .remove (dataRegionId );
243- // 4. GC receiver map
244- consensusPipe2ReciverMap .clear ();
245- LOGGER .info ("All Receivers related to {} are released." , dataRegionId );
227+ receiverLifeCircleLock .writeLock ().lock ();
228+ try {
229+ // Mark this region as stale first, indicating that this region can not create new receivers
230+ // since it has been deleted.
231+ staleRegions .add (dataRegionId .getId ());
232+ // 1. Route to given consensusGroup's receiver map
233+ Map <ConsensusPipeName , AtomicReference <PipeConsensusReceiver >> consensusPipe2ReciverMap =
234+ this .replicaReceiverMap .getOrDefault (
235+ ConsensusGroupId .Factory .create (
236+ TConsensusGroupType .DataRegion .getValue (), dataRegionId .getId ()),
237+ new ConcurrentHashMap <>());
238+ // 2. Release all related receivers
239+ consensusPipe2ReciverMap .entrySet ().stream ()
240+ .filter (entry -> entry .getKey ().getReceiverDataNodeId () == thisNodeId )
241+ .forEach (
242+ receiverEntry -> {
243+ ConsensusPipeName consensusPipeName = receiverEntry .getKey ();
244+ AtomicReference <PipeConsensusReceiver > receiverReference = receiverEntry .getValue ();
245+ if (receiverReference != null ) {
246+ receiverReference .get ().handleExit ();
247+ receiverReference .set (null );
248+ }
249+ });
250+ // 3. Release replica map
251+ this .replicaReceiverMap .remove (dataRegionId );
252+ // 4. GC receiver map
253+ consensusPipe2ReciverMap .clear ();
254+ LOGGER .info ("All Receivers related to {} are released." , dataRegionId );
255+ } finally {
256+ receiverLifeCircleLock .writeLock ().unlock ();
257+ }
246258 }
247259
248260 public final void closeReceiverExecutor () {
0 commit comments