Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions pubsub/cloud-client/iam.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ def get_subscription_policy(project, subscription_name):
print("Policy for subscription {}:".format(subscription_path))
for binding in policy.bindings:
print("Role: {}, Members: {}".format(binding.role, binding.members))

client.close()
# [END pubsub_get_subscription_policy]


Expand Down Expand Up @@ -101,6 +103,8 @@ def set_subscription_policy(project, subscription_name):
subscription_name, policy
)
)

client.close()
# [END pubsub_set_subscription_policy]


Expand Down Expand Up @@ -144,6 +148,8 @@ def check_subscription_permissions(project, subscription_name):
subscription_path, allowed_permissions
)
)

client.close()
# [END pubsub_test_subscription_permissions]


Expand Down
4 changes: 3 additions & 1 deletion pubsub/cloud-client/iam_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,9 @@ def topic(publisher_client):

@pytest.fixture(scope="module")
def subscriber_client():
yield pubsub_v1.SubscriberClient()
subscriber_client = pubsub_v1.SubscriberClient()
yield subscriber_client
subscriber_client.close()


@pytest.fixture
Expand Down
3 changes: 3 additions & 0 deletions pubsub/cloud-client/quickstart.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,9 @@ def callback(message):
else:
# Sleeps the thread at 50Hz to save on resources.
time.sleep(1.0 / 50)

# Release subscriber's underlying resources.
subscriber.close()
# [END pubsub_end_to_end]


Expand Down
14 changes: 8 additions & 6 deletions pubsub/cloud-client/quickstart/sub.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,14 @@ def callback(message):
)
print("Listening for messages on {}..\n".format(subscription_path))

# Calling result() on StreamingPullFuture keeps the main thread from
# exiting while messages get processed in the callbacks.
try:
streaming_pull_future.result()
except: # noqa
streaming_pull_future.cancel()
# Wrap subscriber in a 'with' block to automatically call close() when done.
with subscriber:
Copy link
Copy Markdown
Member

@anguillanneuf anguillanneuf Feb 24, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be with client.

try:
# Calling result() on StreamingPullFuture keeps the main thread from
# exiting while messages get processed in the callbacks.
streaming_pull_future.result()
except: # noqa
streaming_pull_future.cancel()


if __name__ == "__main__":
Expand Down
3 changes: 3 additions & 0 deletions pubsub/cloud-client/quickstart/sub_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ def subscription_path(topic_path):
yield subscription_path

subscriber_client.delete_subscription(subscription_path)
subscriber_client.close()


def _publish_messages(topic_path):
Expand Down Expand Up @@ -102,3 +103,5 @@ def mock_result():
out, _ = capsys.readouterr()
assert "Received message" in out
assert "Acknowledged message" in out

real_client.close()
1 change: 1 addition & 0 deletions pubsub/cloud-client/quickstart_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ def subscription(subscriber_client, topic):
yield SUBSCRIPTION

subscriber_client.delete_subscription(subscription_path)
subscriber_client.close()


def test_end_to_end(topic, subscription, capsys):
Expand Down
58 changes: 40 additions & 18 deletions pubsub/cloud-client/subscriber.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ def list_subscriptions_in_project(project_id):

for subscription in subscriber.list_subscriptions(project_path):
print(subscription.name)

subscriber.close()
# [END pubsub_list_subscriptions]


Expand All @@ -75,6 +77,8 @@ def create_subscription(project_id, topic_name, subscription_name):
)

print("Subscription created: {}".format(subscription))

subscriber.close()
# [END pubsub_create_pull_subscription]


Expand Down Expand Up @@ -104,6 +108,8 @@ def create_push_subscription(

print("Push subscription created: {}".format(subscription))
print("Endpoint for subscription is: {}".format(endpoint))

subscriber.close()
# [END pubsub_create_push_subscription]


Expand All @@ -123,6 +129,8 @@ def delete_subscription(project_id, subscription_name):
subscriber.delete_subscription(subscription_path)

print("Subscription deleted: {}".format(subscription_path))

subscriber.close()
# [END pubsub_delete_subscription]


Expand Down Expand Up @@ -158,6 +166,8 @@ def update_subscription(project_id, subscription_name, endpoint):

print("Subscription updated: {}".format(subscription_path))
print("New endpoint for subscription is: {}".format(result.push_config))

subscriber.close()
# [END pubsub_update_push_configuration]


Expand Down Expand Up @@ -188,12 +198,14 @@ def callback(message):
)
print("Listening for messages on {}..\n".format(subscription_path))

# result() in a future will block indefinitely if `timeout` is not set,
# unless an exception is encountered first.
try:
streaming_pull_future.result(timeout=timeout)
except: # noqa
streaming_pull_future.cancel()
# Wrap subscriber in a 'with' block to automatically call close() when done.
with subscriber:
try:
# result() in a future will block indefinitely if `timeout` is not
# set, unless an exception is encountered first.
streaming_pull_future.result(timeout=timeout)
except: # noqa
streaming_pull_future.cancel()
# [END pubsub_subscriber_async_pull]
# [END pubsub_quickstart_subscriber]

Expand Down Expand Up @@ -230,12 +242,14 @@ def callback(message):
)
print("Listening for messages on {}..\n".format(subscription_path))

# result() in a future will block indefinitely if `timeout` is not set,
# unless an exception is encountered first.
try:
streaming_pull_future.result(timeout=timeout)
except: # noqa
streaming_pull_future.cancel()
# Wrap subscriber in a 'with' block to automatically call close() when done.
with subscriber:
try:
# result() in a future will block indefinitely if `timeout` is not
# set, unless an exception is encountered first.
streaming_pull_future.result(timeout=timeout)
except: # noqa
streaming_pull_future.cancel()
# [END pubsub_subscriber_async_pull_custom_attributes]
# [END pubsub_subscriber_sync_pull_custom_attributes]

Expand Down Expand Up @@ -269,12 +283,14 @@ def callback(message):
)
print("Listening for messages on {}..\n".format(subscription_path))

# result() in a future will block indefinitely if `timeout` is not set,
# unless an exception is encountered first.
try:
streaming_pull_future.result(timeout=timeout)
except: # noqa
streaming_pull_future.cancel()
# Wrap subscriber in a 'with' block to automatically call close() when done.
with subscriber:
try:
# result() in a future will block indefinitely if `timeout` is not
# set, unless an exception is encountered first.
streaming_pull_future.result(timeout=timeout)
except: # noqa
streaming_pull_future.cancel()
# [END pubsub_subscriber_flow_settings]


Expand Down Expand Up @@ -309,6 +325,8 @@ def synchronous_pull(project_id, subscription_name):
len(response.received_messages)
)
)

subscriber.close()
# [END pubsub_subscriber_sync_pull]


Expand Down Expand Up @@ -398,6 +416,8 @@ def worker(msg):
len(response.received_messages)
)
)

subscriber.close()
# [END pubsub_subscriber_sync_pull_with_lease]


Expand Down Expand Up @@ -436,6 +456,8 @@ def callback(message):
subscription_name, e
)
)

subscriber.close()
# [END pubsub_subscriber_error_listener]


Expand Down
4 changes: 3 additions & 1 deletion pubsub/cloud-client/subscriber_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,9 @@ def topic(publisher_client):

@pytest.fixture(scope="module")
def subscriber_client():
yield pubsub_v1.SubscriberClient()
subscriber_client = pubsub_v1.SubscriberClient()
yield subscriber_client
subscriber_client.close()


@pytest.fixture(scope="module")
Expand Down