3232import org .slf4j .Logger ;
3333import org .slf4j .LoggerFactory ;
3434
35+ import javax .annotation .Nullable ;
3536import java .io .IOException ;
3637import java .util .List ;
3738import java .util .stream .Collectors ;
@@ -79,97 +80,64 @@ public ContainerInfo getContainer(final long size,
7980 So we can use different kind of policies.
8081 */
8182
82- ContainerInfo containerInfo = null ;
8383 String failureReason = null ;
8484
8585 //TODO we need to continue the refactor to use repConfig everywhere
8686 //in downstream managers.
8787
88+ PipelineRequestInformation req =
89+ PipelineRequestInformation .Builder .getBuilder ().setSize (size ).build ();
8890
89- while (true ) {
90- List <Pipeline > availablePipelines ;
91- Pipeline pipeline ;
92- // Acquire pipeline manager lock, to avoid any updates to pipeline
93- // while allocate container happens. This is to avoid scenario like
94- // mentioned in HDDS-5655.
95- pipelineManager .acquireReadLock ();
96- try {
97- availablePipelines =
98- findPipelinesByState (repConfig ,
99- excludeList ,
100- Pipeline .PipelineState .OPEN );
101- if (availablePipelines .size () != 0 ) {
102- containerInfo = selectContainer (availablePipelines , size , owner ,
103- excludeList );
104- }
105- if (containerInfo != null ) {
106- return containerInfo ;
107- }
108- } finally {
109- pipelineManager .releaseReadLock ();
110- }
91+ ContainerInfo containerInfo =
92+ getContainer (repConfig , owner , excludeList , req );
93+ if (containerInfo != null ) {
94+ return containerInfo ;
95+ }
11196
112- if (availablePipelines .size () == 0 ) {
97+ try {
98+ // TODO: #CLUTIL Remove creation logic when all replication types
99+ // and factors are handled by pipeline creator
100+ Pipeline pipeline = pipelineManager .createPipeline (repConfig );
101+
102+ // wait until pipeline is ready
103+ pipelineManager .waitPipelineReady (pipeline .getId (), 0 );
104+
105+ } catch (SCMException se ) {
106+ LOG .warn ("Pipeline creation failed for repConfig {} " +
107+ "Datanodes may be used up. Try to see if any pipeline is in " +
108+ "ALLOCATED state, and then will wait for it to be OPEN" ,
109+ repConfig , se );
110+ List <Pipeline > allocatedPipelines = findPipelinesByState (repConfig ,
111+ excludeList ,
112+ Pipeline .PipelineState .ALLOCATED );
113+ if (!allocatedPipelines .isEmpty ()) {
114+ List <PipelineID > allocatedPipelineIDs =
115+ allocatedPipelines .stream ()
116+ .map (p -> p .getId ())
117+ .collect (Collectors .toList ());
113118 try {
114- // TODO: #CLUTIL Remove creation logic when all replication types
115- // and factors are handled by pipeline creator
116- pipeline = pipelineManager .createPipeline (repConfig );
117-
118- // wait until pipeline is ready
119- pipelineManager .waitPipelineReady (pipeline .getId (), 0 );
120-
121- } catch (SCMException se ) {
122- LOG .warn ("Pipeline creation failed for repConfig {} " +
123- "Datanodes may be used up. Try to see if any pipeline is in " +
124- "ALLOCATED state, and then will wait for it to be OPEN" ,
125- repConfig , se );
126- List <Pipeline > allocatedPipelines = findPipelinesByState (repConfig ,
127- excludeList ,
128- Pipeline .PipelineState .ALLOCATED );
129- if (!allocatedPipelines .isEmpty ()) {
130- List <PipelineID > allocatedPipelineIDs =
131- allocatedPipelines .stream ()
132- .map (p -> p .getId ())
133- .collect (Collectors .toList ());
134- try {
135- pipelineManager
136- .waitOnePipelineReady (allocatedPipelineIDs , 0 );
137- } catch (IOException e ) {
138- LOG .warn ("Waiting for one of pipelines {} to be OPEN failed. " ,
139- allocatedPipelineIDs , e );
140- failureReason = "Waiting for one of pipelines to be OPEN failed. "
141- + e .getMessage ();
142- }
143- } else {
144- failureReason = se .getMessage ();
145- }
119+ pipelineManager
120+ .waitOnePipelineReady (allocatedPipelineIDs , 0 );
146121 } catch (IOException e ) {
147- LOG .warn ("Pipeline creation failed for repConfig: {}. "
148- + "Retrying get pipelines call once." , repConfig , e );
149- failureReason = e .getMessage ();
150- }
151-
152- pipelineManager .acquireReadLock ();
153- try {
154- // If Exception occurred or successful creation of pipeline do one
155- // final try to fetch pipelines.
156- availablePipelines = findPipelinesByState (repConfig ,
157- excludeList ,
158- Pipeline .PipelineState .OPEN );
159- if (availablePipelines .size () == 0 ) {
160- LOG .info ("Could not find available pipeline of repConfig: {} "
161- + "even after retrying" , repConfig );
162- break ;
163- }
164- containerInfo = selectContainer (availablePipelines , size , owner ,
165- excludeList );
166- if (containerInfo != null ) {
167- return containerInfo ;
168- }
169- } finally {
170- pipelineManager .releaseReadLock ();
122+ LOG .warn ("Waiting for one of pipelines {} to be OPEN failed. " ,
123+ allocatedPipelineIDs , e );
124+ failureReason = "Waiting for one of pipelines to be OPEN failed. "
125+ + e .getMessage ();
171126 }
127+ } else {
128+ failureReason = se .getMessage ();
172129 }
130+ } catch (IOException e ) {
131+ LOG .warn ("Pipeline creation failed for repConfig: {}. "
132+ + "Retrying get pipelines call once." , repConfig , e );
133+ failureReason = e .getMessage ();
134+ }
135+
136+ // If Exception occurred or successful creation of pipeline do one
137+ // final try to fetch pipelines.
138+ containerInfo = getContainer (repConfig , owner , excludeList , req );
139+ if (containerInfo != null ) {
140+ return containerInfo ;
173141 }
174142
175143 // we have tried all strategies we know but somehow we are not able
@@ -182,6 +150,22 @@ public ContainerInfo getContainer(final long size,
182150 + ", replicationConfig: " + repConfig + ". " + failureReason );
183151 }
184152
153+ @ Nullable
154+ private ContainerInfo getContainer (ReplicationConfig repConfig , String owner ,
155+ ExcludeList excludeList , PipelineRequestInformation req ) {
156+ // Acquire pipeline manager lock, to avoid any updates to pipeline
157+ // while allocate container happens. This is to avoid scenario like
158+ // mentioned in HDDS-5655.
159+ pipelineManager .acquireReadLock ();
160+ try {
161+ List <Pipeline > availablePipelines = findPipelinesByState (repConfig ,
162+ excludeList , Pipeline .PipelineState .OPEN );
163+ return selectContainer (availablePipelines , req , owner , excludeList );
164+ } finally {
165+ pipelineManager .releaseReadLock ();
166+ }
167+ }
168+
185169 private List <Pipeline > findPipelinesByState (
186170 final ReplicationConfig repConfig ,
187171 final ExcludeList excludeList ,
@@ -197,23 +181,26 @@ private List<Pipeline> findPipelinesByState(
197181 return pipelines ;
198182 }
199183
200- private ContainerInfo selectContainer (List <Pipeline > availablePipelines ,
201- long size , String owner , ExcludeList excludeList ) {
202- Pipeline pipeline ;
203- ContainerInfo containerInfo ;
184+ private @ Nullable ContainerInfo selectContainer (
185+ List <Pipeline > availablePipelines , PipelineRequestInformation req ,
186+ String owner , ExcludeList excludeList ) {
204187
205- PipelineRequestInformation pri =
206- PipelineRequestInformation .Builder .getBuilder ().setSize (size )
207- .build ();
208- pipeline = pipelineChoosePolicy .choosePipeline (
209- availablePipelines , pri );
188+ while (!availablePipelines .isEmpty ()) {
189+ Pipeline pipeline = pipelineChoosePolicy .choosePipeline (
190+ availablePipelines , req );
210191
211- // look for OPEN containers that match the criteria.
212- containerInfo = containerManager .getMatchingContainer (size , owner ,
213- pipeline , excludeList .getContainerIds ());
192+ // look for OPEN containers that match the criteria.
193+ final ContainerInfo containerInfo = containerManager .getMatchingContainer (
194+ req . getSize (), owner , pipeline , excludeList .getContainerIds ());
214195
215- return containerInfo ;
196+ if (containerInfo != null ) {
197+ return containerInfo ;
198+ }
199+
200+ availablePipelines .remove (pipeline );
201+ }
216202
203+ return null ;
217204 }
218205
219206}
0 commit comments