Skip to content

Commit bd5bcad

Browse files
authored
refactor(subscriber): message stream retry logic (#607)
1 parent 9c40008 commit bd5bcad

4 files changed

Lines changed: 197 additions & 39 deletions

File tree

handwritten/pubsub/src/message-stream.ts

Lines changed: 20 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -26,38 +26,15 @@ import {
2626
import * as isStreamEnded from 'is-stream-ended';
2727
import {PassThrough} from 'stream';
2828

29-
import {PullResponse, Subscriber} from './subscriber';
29+
import {PullRetry} from './pull-retry';
30+
import {Subscriber} from './subscriber';
31+
import {google} from '../proto/pubsub';
3032

3133
/*!
3234
* Frequency to ping streams.
3335
*/
3436
const KEEP_ALIVE_INTERVAL = 30000;
3537

36-
/*!
37-
* Deadline Exceeded status code
38-
*/
39-
const DEADLINE: status = 4;
40-
41-
/*!
42-
* Unknown status code
43-
*/
44-
const UNKNOWN: status = 2;
45-
46-
/*!
47-
* codes to retry streams
48-
*/
49-
const RETRY_CODES: status[] = [
50-
0, // ok
51-
1, // canceled
52-
2, // unknown
53-
4, // deadline exceeded
54-
8, // resource exhausted
55-
10, // aborted
56-
13, // internal error
57-
14, // unavailable
58-
15, // dataloss
59-
];
60-
6138
/*!
6239
* Deadline for the stream.
6340
*/
@@ -79,14 +56,8 @@ interface StreamState {
7956
highWaterMark: number;
8057
}
8158

82-
interface StreamingPullRequest {
83-
subscription?: string;
84-
ackIds?: string[];
85-
modifyDeadlineSeconds?: number[];
86-
modifyDeadlineAckIds?: string[];
87-
streamAckDeadlineSeconds?: number;
88-
}
89-
59+
type StreamingPullRequest = google.pubsub.v1.IStreamingPullRequest;
60+
type PullResponse = google.pubsub.v1.IPullResponse;
9061
type PullStream = ClientDuplexStream<StreamingPullRequest, PullResponse> & {
9162
_readableState: StreamState;
9263
};
@@ -119,7 +90,9 @@ export class ChannelError extends Error implements ServiceError {
11990
code: status;
12091
constructor(err: Error) {
12192
super(`Failed to connect to channel. Reason: ${err.message}`);
122-
this.code = err.message.includes('deadline') ? DEADLINE : UNKNOWN;
93+
this.code = err.message.includes('deadline')
94+
? status.DEADLINE_EXCEEDED
95+
: status.UNKNOWN;
12396
}
12497
}
12598

@@ -154,7 +127,9 @@ export interface MessageStreamOptions {
154127
export class MessageStream extends PassThrough {
155128
destroyed: boolean;
156129
private _keepAliveHandle: NodeJS.Timer;
130+
private _fillHandle?: NodeJS.Timer;
157131
private _options: MessageStreamOptions;
132+
private _retrier: PullRetry;
158133
private _streams: Map<PullStream, boolean>;
159134
private _subscriber: Subscriber;
160135
constructor(sub: Subscriber, options = {} as MessageStreamOptions) {
@@ -164,6 +139,7 @@ export class MessageStream extends PassThrough {
164139

165140
this.destroyed = false;
166141
this._options = options;
142+
this._retrier = new PullRetry();
167143
this._streams = new Map();
168144
this._subscriber = sub;
169145

@@ -253,6 +229,8 @@ export class MessageStream extends PassThrough {
253229
streamAckDeadlineSeconds: this._subscriber.ackDeadline,
254230
};
255231

232+
delete this._fillHandle;
233+
256234
for (let i = this._streams.size; i < this._options.maxStreams!; i++) {
257235
const stream: PullStream = client.streamingPull({deadline});
258236
this._addStream(stream);
@@ -302,8 +280,13 @@ export class MessageStream extends PassThrough {
302280
private _onEnd(stream: PullStream, status: StatusObject): void {
303281
this._removeStream(stream);
304282

305-
if (RETRY_CODES.includes(status.code)) {
306-
this._fillStreamPool();
283+
if (this._fillHandle) {
284+
return;
285+
}
286+
287+
if (this._retrier.retry(status)) {
288+
const delay = this._retrier.createTimeout();
289+
this._fillHandle = setTimeout(() => this._fillStreamPool(), delay);
307290
} else if (!this._streams.size) {
308291
this.destroy(new StatusError(status));
309292
}
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
/*!
2+
* Copyright 2019 Google Inc. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
import {StatusObject, status} from 'grpc';
17+
18+
/*!
19+
* retryable status codes
20+
*/
21+
export const RETRY_CODES: status[] = [
22+
status.OK,
23+
status.CANCELLED,
24+
status.UNKNOWN,
25+
status.DEADLINE_EXCEEDED,
26+
status.RESOURCE_EXHAUSTED,
27+
status.ABORTED,
28+
status.INTERNAL,
29+
status.UNAVAILABLE,
30+
status.DATA_LOSS,
31+
];
32+
33+
/**
34+
* Used to track pull requests and determine if additional requests should be
35+
* made, etc.
36+
*
37+
* @class
38+
* @private
39+
*/
40+
export class PullRetry {
41+
private failures = 0;
42+
/**
43+
* Generates a timeout that can be used for applying a backoff based on the
44+
* current number of failed requests.
45+
*
46+
* @see {@link https://cloud.google.com/iot/docs/how-tos/exponential-backoff}
47+
* @private
48+
* @returns {number}
49+
*/
50+
createTimeout(): number {
51+
if (this.failures === 0) {
52+
return 0;
53+
}
54+
return Math.pow(2, this.failures) * 1000 + Math.floor(Math.random() * 1000);
55+
}
56+
/**
57+
* Determines if a request status should be retried.
58+
*
59+
* Deadlines behave kind of unexpectedly on streams, rather than using it as
60+
* an indicator of when to give up trying to connect, it actually dictates
61+
* how long the stream should stay open. Because of this, it is virtually
62+
* impossible to determine whether or not a deadline error is the result of
63+
* the server closing the stream or if we timed out waiting for a connection.
64+
*
65+
* @private
66+
* @param {object} status The request status.
67+
* @returns {boolean}
68+
*/
69+
retry(err: StatusObject): boolean {
70+
if (err.code === status.OK || err.code === status.DEADLINE_EXCEEDED) {
71+
this.failures = 0;
72+
} else {
73+
this.failures += 1;
74+
}
75+
76+
return RETRY_CODES.includes(err.code);
77+
}
78+
}

handwritten/pubsub/test/message-stream.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -506,7 +506,7 @@ describe('MessageStream', () => {
506506

507507
stream.push(null);
508508
setImmediate(() => {
509-
assert.strictEqual(client.streams.length, 6);
509+
assert.strictEqual(client.streams.length, 5);
510510
done();
511511
});
512512
});
@@ -524,7 +524,7 @@ describe('MessageStream', () => {
524524
assert.strictEqual(stream.listenerCount('end'), count);
525525

526526
setImmediate(() => {
527-
assert.strictEqual(client.streams.length, 6);
527+
assert.strictEqual(client.streams.length, 5);
528528
done();
529529
});
530530
});
Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
/**
2+
* Copyright 2019 Google Inc. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
import assert = require('assert');
18+
import sinon = require('sinon');
19+
import {StatusObject, status} from 'grpc';
20+
import {PullRetry} from '../src/pull-retry';
21+
22+
describe('PullRetry', () => {
23+
const sandbox = sinon.createSandbox();
24+
25+
let retrier: PullRetry;
26+
27+
beforeEach(() => {
28+
retrier = new PullRetry();
29+
});
30+
31+
afterEach(() => {
32+
sandbox.restore();
33+
});
34+
35+
describe('createTimeout', () => {
36+
it('should return 0 when no failures have occurred', () => {
37+
assert.strictEqual(retrier.createTimeout(), 0);
38+
});
39+
40+
it('should use a backoff factoring in the failure count', () => {
41+
const random = Math.random();
42+
const expected = Math.pow(2, 1) * 1000 + Math.floor(random * 1000);
43+
44+
sandbox.stub(global.Math, 'random').returns(random);
45+
46+
retrier.retry({code: status.CANCELLED} as StatusObject);
47+
assert.strictEqual(retrier.createTimeout(), expected);
48+
});
49+
});
50+
51+
describe('retry', () => {
52+
it('should return true for retryable errors', () => {
53+
[
54+
status.OK,
55+
status.CANCELLED,
56+
status.UNKNOWN,
57+
status.DEADLINE_EXCEEDED,
58+
status.RESOURCE_EXHAUSTED,
59+
status.ABORTED,
60+
status.INTERNAL,
61+
status.UNAVAILABLE,
62+
status.DATA_LOSS,
63+
].forEach((code: status) => {
64+
const shouldRetry = retrier.retry({code} as StatusObject);
65+
assert.strictEqual(shouldRetry, true);
66+
});
67+
});
68+
69+
it('should return false for non-retryable errors', () => {
70+
[
71+
status.INVALID_ARGUMENT,
72+
status.NOT_FOUND,
73+
status.PERMISSION_DENIED,
74+
status.FAILED_PRECONDITION,
75+
status.OUT_OF_RANGE,
76+
status.UNIMPLEMENTED,
77+
].forEach((code: status) => {
78+
const shouldRetry = retrier.retry({code} as StatusObject);
79+
assert.strictEqual(shouldRetry, false);
80+
});
81+
});
82+
83+
it('should reset the failure count on OK', () => {
84+
retrier.retry({code: status.CANCELLED} as StatusObject);
85+
retrier.retry({code: status.OK} as StatusObject);
86+
87+
assert.strictEqual(retrier.createTimeout(), 0);
88+
});
89+
90+
it('should reset the failure count on DEADLINE_EXCEEDED', () => {
91+
retrier.retry({code: status.CANCELLED} as StatusObject);
92+
retrier.retry({code: status.DEADLINE_EXCEEDED} as StatusObject);
93+
94+
assert.strictEqual(retrier.createTimeout(), 0);
95+
});
96+
});
97+
});

0 commit comments

Comments
 (0)