Skip to content
This repository was archived by the owner on Mar 9, 2026. It is now read-only.

Commit 5407e01

Browse files
committed
Make max_duration_per_lease_extension high enough
If flow_control.max_duration_per_lease_extension is set to too low a value, it is adjusted to the minimum ACK deadline.
1 parent 223a986 commit 5407e01

4 files changed

Lines changed: 66 additions & 32 deletions

File tree

google/cloud/pubsub_v1/subscriber/_protocol/histogram.py

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,10 @@
1515
from __future__ import absolute_import, division
1616

1717

18+
MIN_ACK_DEADLINE = 10
19+
MAX_ACK_DEADLINE = 600
20+
21+
1822
class Histogram(object):
1923
"""Representation of a single histogram.
2024
@@ -96,28 +100,30 @@ def max(self):
96100
def min(self):
97101
"""Return the minimum value in this histogram.
98102
99-
If there are no values in the histogram at all, return 10.
103+
If there are no values in the histogram at all, return the min default.
100104
101105
Returns:
102106
int: The minimum value in the histogram.
103107
"""
104108
if len(self._data) == 0:
105-
return 10
109+
return MIN_ACK_DEADLINE
106110
return next(iter(sorted(self._data.keys())))
107111

108112
def add(self, value):
109113
"""Add the value to this histogram.
110114
111115
Args:
112-
value (int): The value. Values outside of ``10 <= x <= 600``
113-
will be raised to ``10`` or reduced to ``600``.
116+
value (int): The value. Values outside of
117+
``MIN_ACK_DEADLINE <= x <= MAX_ACK_DEADLINE``
118+
will be raised to ``MIN_ACK_DEADLINE`` or reduced to
119+
``MAX_ACK_DEADLINE``.
114120
"""
115121
# If the value is out of bounds, bring it in bounds.
116122
value = int(value)
117-
if value < 10:
118-
value = 10
119-
if value > 600:
120-
value = 600
123+
if value < MIN_ACK_DEADLINE:
124+
value = MIN_ACK_DEADLINE
125+
elif value > MAX_ACK_DEADLINE:
126+
value = MAX_ACK_DEADLINE
121127

122128
# Add the value to the histogram's data dictionary.
123129
self._data.setdefault(value, 0)
@@ -129,7 +135,7 @@ def percentile(self, percent):
129135
130136
Args:
131137
percent (Union[int, float]): The precentile being sought. The
132-
default consumer implementations use consistently use ``99``.
138+
default consumer implementations consistently use ``99``.
133139
134140
Returns:
135141
int: The value corresponding to the requested percentile.
@@ -150,5 +156,5 @@ def percentile(self, percent):
150156
return k
151157

152158
# The only way to get here is if there was no data.
153-
# In this case, just return 10 seconds.
154-
return 10
159+
# In this case, just return the shortest possible deadline.
160+
return MIN_ACK_DEADLINE

google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@ def __init__(
143143
self._await_callbacks_on_shutdown = await_callbacks_on_shutdown
144144
self._ack_histogram = histogram.Histogram()
145145
self._last_histogram_size = 0
146-
self._ack_deadline = 10
146+
self._ack_deadline = histogram.MIN_ACK_DEADLINE
147147
self._rpc = None
148148
self._callback = None
149149
self._closing = threading.Lock()
@@ -248,10 +248,12 @@ def ack_deadline(self):
248248
self._ack_deadline = self.ack_histogram.percentile(percent=99)
249249

250250
if self.flow_control.max_duration_per_lease_extension > 0:
251-
self._ack_deadline = min(
252-
self._ack_deadline,
251+
# The setting in flow control could be too low, adjust if needed.
252+
flow_control_setting = max(
253253
self.flow_control.max_duration_per_lease_extension,
254+
histogram.MIN_ACK_DEADLINE,
254255
)
256+
self._ack_deadline = min(self._ack_deadline, flow_control_setting)
255257
return self._ack_deadline
256258

257259
@property

google/cloud/pubsub_v1/types.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,8 @@ class LimitExceededBehavior(str, enum.Enum):
152152
FlowControl.max_duration_per_lease_extension.__doc__ = (
153153
"The max amount of time in seconds for a single lease extension attempt. "
154154
"Bounds the delay before a message redelivery if the subscriber "
155-
"fails to extend the deadline."
155+
"fails to extend the deadline. Must be between 10 and 600 (inclusive). Ignored "
156+
"if set to 0."
156157
)
157158

158159

tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py

Lines changed: 42 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -139,13 +139,49 @@ def fake_add(self, items):
139139
leaser.add = stdlib_types.MethodType(fake_add, leaser)
140140

141141

142-
def test_ack_deadline():
142+
def test_ack_deadline_no_custom_flow_control_setting():
143+
from google.cloud.pubsub_v1.subscriber._protocol import histogram
144+
143145
manager = make_manager()
144-
assert manager.ack_deadline == 10
145-
manager.ack_histogram.add(20)
146-
assert manager.ack_deadline == 20
147-
manager.ack_histogram.add(10)
148-
assert manager.ack_deadline == 20
146+
147+
# Make sure that max_duration_per_lease_extension is disabled.
148+
manager._flow_control = types.FlowControl(max_duration_per_lease_extension=0)
149+
150+
assert manager.ack_deadline == histogram.MIN_ACK_DEADLINE
151+
152+
# When we get some historical data, the deadline is adjusted.
153+
manager.ack_histogram.add(histogram.MIN_ACK_DEADLINE * 2)
154+
assert manager.ack_deadline == histogram.MIN_ACK_DEADLINE * 2
155+
156+
# Adding just a single additional data point does not yet change the deadline.
157+
manager.ack_histogram.add(histogram.MIN_ACK_DEADLINE)
158+
assert manager.ack_deadline == histogram.MIN_ACK_DEADLINE * 2
159+
160+
161+
def test_ack_deadline_with_max_duration_per_lease_extension():
162+
from google.cloud.pubsub_v1.subscriber._protocol import histogram
163+
164+
manager = make_manager()
165+
manager._flow_control = types.FlowControl(
166+
max_duration_per_lease_extension=histogram.MIN_ACK_DEADLINE + 1
167+
)
168+
manager.ack_histogram.add(histogram.MIN_ACK_DEADLINE * 3) # make p99 value large
169+
170+
# The deadline configured in flow control should prevail.
171+
assert manager.ack_deadline == histogram.MIN_ACK_DEADLINE + 1
172+
173+
174+
def test_ack_deadline_with_max_duration_per_lease_extension_too_low():
175+
from google.cloud.pubsub_v1.subscriber._protocol import histogram
176+
177+
manager = make_manager()
178+
manager._flow_control = types.FlowControl(
179+
max_duration_per_lease_extension=histogram.MIN_ACK_DEADLINE - 1
180+
)
181+
manager.ack_histogram.add(histogram.MIN_ACK_DEADLINE * 3) # make p99 value large
182+
183+
# The deadline configured in flow control should be adjusted to the minimum allowed.
184+
assert manager.ack_deadline == histogram.MIN_ACK_DEADLINE
149185

150186

151187
def test_client_id():
@@ -181,17 +217,6 @@ def test_streaming_flow_control_use_legacy_flow_control():
181217
assert request.max_outstanding_bytes == 0
182218

183219

184-
def test_ack_deadline_with_max_duration_per_lease_extension():
185-
manager = make_manager()
186-
manager._flow_control = types.FlowControl(max_duration_per_lease_extension=5)
187-
188-
assert manager.ack_deadline == 5
189-
for _ in range(5):
190-
manager.ack_histogram.add(20)
191-
192-
assert manager.ack_deadline == 5
193-
194-
195220
def test_maybe_pause_consumer_wo_consumer_set():
196221
manager = make_manager(
197222
flow_control=types.FlowControl(max_messages=10, max_bytes=1000)

0 commit comments

Comments
 (0)