Skip to content

Commit 38b4c2c

Browse files
committed
Merge pull request #36 from reactphp/collection-cancellation
Cancellation of promise collections
2 parents f2b2a12 + ba63a61 commit 38b4c2c

13 files changed

Lines changed: 586 additions & 81 deletions

README.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -446,6 +446,11 @@ $promise->then(function ($value) {
446446
Useful functions for creating, joining, mapping and reducing collections of
447447
promises.
448448

449+
All functions working on promise collections (like `all()`, `race()`, `some()`
450+
etc.) support cancellation. This means, if you call `cancel()` on the returned
451+
promise, all promises in the collection are cancelled. If the collection itself
452+
is a promise which resolves to an array, this promise is also cancelled.
453+
449454
#### resolve()
450455

451456
```php

composer.json

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,11 @@
1414
},
1515
"files": ["src/functions_include.php"]
1616
},
17+
"autoload-dev": {
18+
"psr-4": {
19+
"React\\Promise\\": "tests/fixtures"
20+
}
21+
},
1722
"extra": {
1823
"branch-alias": {
1924
"dev-master": "2.0-dev"

src/CancellationQueue.php

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
<?php
2+
3+
namespace React\Promise;
4+
5+
class CancellationQueue
6+
{
7+
private $started = false;
8+
9+
/**
10+
* @var CancellablePromiseInterface[]
11+
*/
12+
private $queue = [];
13+
14+
public function __invoke()
15+
{
16+
if ($this->started) {
17+
return;
18+
}
19+
20+
$this->started = true;
21+
$this->drain();
22+
}
23+
24+
public function enqueue($promise)
25+
{
26+
if (!$promise instanceof CancellablePromiseInterface) {
27+
return;
28+
}
29+
30+
$length = array_push($this->queue, $promise);
31+
32+
if ($this->started && 1 === $length) {
33+
$this->drain();
34+
}
35+
}
36+
37+
private function drain()
38+
{
39+
for ($i = key($this->queue); isset($this->queue[$i]); $i++) {
40+
$promise = $this->queue[$i];
41+
42+
$exception = null;
43+
44+
try {
45+
$promise->cancel();
46+
} catch (\Exception $exception) {
47+
}
48+
49+
unset($this->queue[$i]);
50+
51+
if ($exception) {
52+
throw $exception;
53+
}
54+
}
55+
56+
$this->queue = [];
57+
}
58+
}

src/functions.php

Lines changed: 88 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -37,19 +37,35 @@ function all($promisesOrValues)
3737

3838
function race($promisesOrValues)
3939
{
40-
return resolve($promisesOrValues)
41-
->then(function ($array) {
42-
if (!is_array($array) || !$array) {
43-
return resolve();
44-
}
40+
$cancellationQueue = new CancellationQueue();
41+
$cancellationQueue->enqueue($promisesOrValues);
42+
43+
return new Promise(function ($resolve, $reject, $notify) use ($promisesOrValues, $cancellationQueue) {
44+
resolve($promisesOrValues)
45+
->done(function ($array) use ($cancellationQueue, $resolve, $reject, $notify) {
46+
if (!is_array($array) || !$array) {
47+
$resolve();
48+
return;
49+
}
4550

46-
return new Promise(function ($resolve, $reject, $notify) use ($array) {
4751
foreach ($array as $promiseOrValue) {
52+
$cancellationQueue->enqueue($promiseOrValue);
53+
54+
$fulfiller = function ($value) use ($cancellationQueue, $resolve) {
55+
$cancellationQueue();
56+
$resolve($value);
57+
};
58+
59+
$rejecter = function ($reason) use ($cancellationQueue, $reject) {
60+
$cancellationQueue();
61+
$reject($reason);
62+
};
63+
4864
resolve($promiseOrValue)
49-
->done($resolve, $reject, $notify);
65+
->done($fulfiller, $rejecter, $notify);
5066
}
51-
});
52-
});
67+
}, $reject, $notify);
68+
}, $cancellationQueue);
5369
}
5470

5571
function any($promisesOrValues)
@@ -62,28 +78,33 @@ function any($promisesOrValues)
6278

6379
function some($promisesOrValues, $howMany)
6480
{
65-
return resolve($promisesOrValues)
66-
->then(function ($array) use ($howMany) {
67-
if (!is_array($array) || !$array || $howMany < 1) {
68-
return resolve([]);
69-
}
81+
$cancellationQueue = new CancellationQueue();
82+
$cancellationQueue->enqueue($promisesOrValues);
83+
84+
return new Promise(function ($resolve, $reject, $notify) use ($promisesOrValues, $howMany, $cancellationQueue) {
85+
resolve($promisesOrValues)
86+
->done(function ($array) use ($howMany, $cancellationQueue, $resolve, $reject, $notify) {
87+
if (!is_array($array) || !$array || $howMany < 1) {
88+
$resolve([]);
89+
return;
90+
}
7091

71-
return new Promise(function ($resolve, $reject, $notify) use ($array, $howMany) {
7292
$len = count($array);
7393
$toResolve = min($howMany, $len);
7494
$toReject = ($len - $toResolve) + 1;
7595
$values = [];
7696
$reasons = [];
7797

7898
foreach ($array as $i => $promiseOrValue) {
79-
$fulfiller = function ($val) use ($i, &$values, &$toResolve, $toReject, $resolve) {
99+
$fulfiller = function ($val) use ($i, &$values, &$toResolve, $toReject, $resolve, $cancellationQueue) {
80100
if ($toResolve < 1 || $toReject < 1) {
81101
return;
82102
}
83103

84104
$values[$i] = $val;
85105

86106
if (0 === --$toResolve) {
107+
$cancellationQueue();
87108
$resolve($values);
88109
}
89110
};
@@ -100,26 +121,34 @@ function some($promisesOrValues, $howMany)
100121
}
101122
};
102123

124+
$cancellationQueue->enqueue($promiseOrValue);
125+
103126
resolve($promiseOrValue)
104127
->done($fulfiller, $rejecter, $notify);
105128
}
106-
});
107-
});
129+
}, $reject, $notify);
130+
}, $cancellationQueue);
108131
}
109132

110133
function map($promisesOrValues, callable $mapFunc)
111134
{
112-
return resolve($promisesOrValues)
113-
->then(function ($array) use ($mapFunc) {
114-
if (!is_array($array) || !$array) {
115-
return resolve([]);
116-
}
135+
$cancellationQueue = new CancellationQueue();
136+
$cancellationQueue->enqueue($promisesOrValues);
137+
138+
return new Promise(function ($resolve, $reject, $notify) use ($promisesOrValues, $mapFunc, $cancellationQueue) {
139+
resolve($promisesOrValues)
140+
->done(function ($array) use ($mapFunc, $cancellationQueue, $resolve, $reject, $notify) {
141+
if (!is_array($array) || !$array) {
142+
$resolve([]);
143+
return;
144+
}
117145

118-
return new Promise(function ($resolve, $reject, $notify) use ($array, $mapFunc) {
119146
$toResolve = count($array);
120147
$values = [];
121148

122149
foreach ($array as $i => $promiseOrValue) {
150+
$cancellationQueue->enqueue($promiseOrValue);
151+
123152
resolve($promiseOrValue)
124153
->then($mapFunc)
125154
->done(
@@ -134,35 +163,45 @@ function ($mapped) use ($i, &$values, &$toResolve, $resolve) {
134163
$notify
135164
);
136165
}
137-
});
138-
});
166+
}, $reject, $notify);
167+
}, $cancellationQueue);
139168
}
140169

141170
function reduce($promisesOrValues, callable $reduceFunc, $initialValue = null)
142171
{
143-
return resolve($promisesOrValues)
144-
->then(function ($array) use ($reduceFunc, $initialValue) {
145-
if (!is_array($array)) {
146-
$array = [];
147-
}
148-
149-
$total = count($array);
150-
$i = 0;
151-
152-
// Wrap the supplied $reduceFunc with one that handles promises and then
153-
// delegates to the supplied.
154-
$wrappedReduceFunc = function ($current, $val) use ($reduceFunc, $total, &$i) {
155-
return resolve($current)
156-
->then(function ($c) use ($reduceFunc, $total, &$i, $val) {
157-
return resolve($val)
158-
->then(function ($value) use ($reduceFunc, $total, &$i, $c) {
159-
return $reduceFunc($c, $value, $i++, $total);
160-
});
161-
});
162-
};
163-
164-
return array_reduce($array, $wrappedReduceFunc, $initialValue);
165-
});
172+
$cancellationQueue = new CancellationQueue();
173+
$cancellationQueue->enqueue($promisesOrValues);
174+
175+
return new Promise(function ($resolve, $reject, $notify) use ($promisesOrValues, $reduceFunc, $initialValue, $cancellationQueue) {
176+
resolve($promisesOrValues)
177+
->done(function ($array) use ($reduceFunc, $initialValue, $cancellationQueue, $resolve, $reject, $notify) {
178+
if (!is_array($array)) {
179+
$array = [];
180+
}
181+
182+
$total = count($array);
183+
$i = 0;
184+
185+
// Wrap the supplied $reduceFunc with one that handles promises and then
186+
// delegates to the supplied.
187+
$wrappedReduceFunc = function ($current, $val) use ($reduceFunc, $cancellationQueue, $total, &$i) {
188+
$cancellationQueue->enqueue($val);
189+
190+
return $current
191+
->then(function ($c) use ($reduceFunc, $total, &$i, $val) {
192+
return resolve($val)
193+
->then(function ($value) use ($reduceFunc, $total, &$i, $c) {
194+
return $reduceFunc($c, $value, $i++, $total);
195+
});
196+
});
197+
};
198+
199+
$cancellationQueue->enqueue($initialValue);
200+
201+
array_reduce($array, $wrappedReduceFunc, resolve($initialValue))
202+
->done($resolve, $reject, $notify);
203+
}, $reject, $notify);
204+
}, $cancellationQueue);
166205
}
167206

168207
// Internal functions

tests/CancellationQueueTest.php

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
<?php
2+
3+
namespace React\Promise;
4+
5+
class CancellationQueueTest extends TestCase
6+
{
7+
/** @test */
8+
public function ignoresNonCancellablePromises()
9+
{
10+
$p = new SimpleFulfilledTestPromise();
11+
12+
$cancellationQueue = new CancellationQueue();
13+
$cancellationQueue->enqueue($p);
14+
15+
$cancellationQueue();
16+
17+
$this->assertFalse($p->cancelCalled);
18+
}
19+
20+
/** @test */
21+
public function callsCancelOnPromisesEnqueuedBeforeStart()
22+
{
23+
$d1 = $this->getCancellableDeferred();
24+
$d2 = $this->getCancellableDeferred();
25+
26+
$cancellationQueue = new CancellationQueue();
27+
$cancellationQueue->enqueue($d1->promise());
28+
$cancellationQueue->enqueue($d2->promise());
29+
30+
$cancellationQueue();
31+
}
32+
33+
/** @test */
34+
public function callsCancelOnPromisesEnqueuedAfterStart()
35+
{
36+
$d1 = $this->getCancellableDeferred();
37+
$d2 = $this->getCancellableDeferred();
38+
39+
$cancellationQueue = new CancellationQueue();
40+
41+
$cancellationQueue();
42+
43+
$cancellationQueue->enqueue($d2->promise());
44+
$cancellationQueue->enqueue($d1->promise());
45+
}
46+
47+
/** @test */
48+
public function doesNotCallCancelTwiceWhenStartedTwice()
49+
{
50+
$d = $this->getCancellableDeferred();
51+
52+
$cancellationQueue = new CancellationQueue();
53+
$cancellationQueue->enqueue($d->promise());
54+
55+
$cancellationQueue();
56+
$cancellationQueue();
57+
}
58+
59+
/** @test */
60+
public function rethrowsExceptionsThrownFromCancel()
61+
{
62+
$this->setExpectedException('\Exception', 'test');
63+
64+
$mock = $this->getMock('React\Promise\CancellablePromiseInterface');
65+
$mock
66+
->expects($this->once())
67+
->method('cancel')
68+
->will($this->throwException(new \Exception('test')));
69+
70+
$cancellationQueue = new CancellationQueue();
71+
$cancellationQueue->enqueue($mock);
72+
73+
$cancellationQueue();
74+
}
75+
76+
private function getCancellableDeferred()
77+
{
78+
$mock = $this->createCallableMock();
79+
$mock
80+
->expects($this->once())
81+
->method('__invoke');
82+
83+
return new Deferred($mock);
84+
}
85+
}

0 commit comments

Comments
 (0)