Skip to content
This repository was archived by the owner on Mar 26, 2026. It is now read-only.

Commit 232d255

Browse files
committed
feat: add support for async rest streaming methods
1 parent c2eab2d commit 232d255

12 files changed

Lines changed: 103 additions & 47 deletions

File tree

gapic/templates/%namespace/%name_%version/%sub/services/%service/transports/rest_asyncio.py.j2

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ from google.api_core import exceptions as core_exceptions
2222
from google.api_core import gapic_v1
2323
from google.api_core import retry_async as retries
2424
from google.api_core import rest_helpers
25+
{# TODO(https://github.com/googleapis/gapic-generator-python/issues/2137): raise an import error if an older version of google.api.core is installed. #}
26+
from google.api_core import rest_streaming_async # type: ignore
2527

2628
try:
2729
from google.api_core import rest_streaming_async # type: ignore
@@ -128,30 +130,27 @@ class Async{{service.name}}RestTransport(_Base{{ service.name }}RestTransport):
128130
def __hash__(self):
129131
return hash("Async{{service.name}}RestTransport.{{method.name}}")
130132

131-
{# TODO(https://github.com/googleapis/gapic-generator-python/issues/2168): Implement server streaming method. #}
132133
{# TODO(https://github.com/googleapis/gapic-generator-python/issues/2169): Implement client streaming method. #}
133134
{# TODO(https://github.com/googleapis/gapic-generator-python/issues/2170): Implement long running operation method. #}
134135
{# TODO(https://github.com/googleapis/gapic-generator-python/issues/2171): Implement pager method. #}
135-
{% if method.http_options and not method.client_streaming and not method.server_streaming and not method.lro and not method.extended_lro and not method.paged_result_field %}
136+
{% if method.http_options and not method.client_streaming and not method.lro and not method.extended_lro and not method.paged_result_field %}
136137
{% set body_spec = method.http_options[0].body %}
137138
{{ shared_macros.response_method(body_spec, is_async=True)|indent(8) }}
138139

139-
{% endif %}{# method.http_options and not method.client_streaming and not method.server_streaming and not method.lro and not method.extended_lro and not method.paged_result_field #}
140+
{% endif %}{# method.http_options and not method.client_streaming and not method.lro and not method.extended_lro and not method.paged_result_field #}
140141
async def __call__(self,
141142
request: {{method.input.ident}}, *,
142143
retry: OptionalRetry=gapic_v1.method.DEFAULT,
143144
timeout: Optional[float]=None,
144145
metadata: Sequence[Tuple[str, str]]=(),
145-
{# TODO(https://github.com/googleapis/gapic-generator-python/issues/2168): Update return type for server streaming method. #}
146146
{# TODO(https://github.com/googleapis/gapic-generator-python/issues/2169): Update return type for client streaming method. #}
147147
{# TODO(https://github.com/googleapis/gapic-generator-python/issues/2170): Update the return type for long running operation method. #}
148148
{# TODO(https://github.com/googleapis/gapic-generator-python/issues/2171): Update the return type for pager method. #}
149-
){% if not method.void %} -> {% if not method.server_streaming %}{{method.output.ident}}{% else %}None{% endif %}{% endif %}:
150-
{# TODO(https://github.com/googleapis/gapic-generator-python/issues/2168): Implement server streaming method. #}
149+
){% if not method.void %} -> {% if not method.server_streaming %}{{method.output.ident}}{% else %}rest_streaming_async.AsyncResponseIterator{% endif %}{% endif %}:
151150
{# TODO(https://github.com/googleapis/gapic-generator-python/issues/2169): Implement client streaming method. #}
152151
{# TODO(https://github.com/googleapis/gapic-generator-python/issues/2170): Implement long running operation method. #}
153152
{# TODO(https://github.com/googleapis/gapic-generator-python/issues/2171): Implement pager method. #}
154-
{% if method.http_options and not method.client_streaming and not method.server_streaming and not method.lro and not method.extended_lro and not method.paged_result_field %}
153+
{% if method.http_options and not method.client_streaming and not method.lro and not method.extended_lro and not method.paged_result_field %}
155154
r"""Call the {{- ' ' -}}
156155
{{ (method.name|snake_case).replace('_',' ')|wrap(
157156
width=70, offset=45, indent=8) }}
@@ -178,14 +177,18 @@ class Async{{service.name}}RestTransport(_Base{{ service.name }}RestTransport):
178177

179178
{% if not method.void %}
180179
# Return the response
180+
{% if method.server_streaming %}
181+
resp = rest_streaming_async.AsyncResponseIterator(response, {{method.output.ident}})
182+
{% else %}
181183
resp = {{method.output.ident}}()
182184
{% if method.output.ident.is_proto_plus_type %}
183185
pb_resp = {{method.output.ident}}.pb(resp)
184186
{% else %}
185187
pb_resp = resp
186-
{% endif %}
188+
{% endif %}{# if method.output.ident.is_proto_plus_type #}
187189
content = await response.read()
188190
json_format.Parse(content, pb_resp, ignore_unknown_fields=True)
191+
{% endif %}{# if method.server_streaming #}
189192
return resp
190193

191194
{% endif %}{# method.void #}
@@ -194,7 +197,7 @@ class Async{{service.name}}RestTransport(_Base{{ service.name }}RestTransport):
194197
raise NotImplementedError(
195198
"Method {{ method.name }} is not available over REST transport"
196199
)
197-
{% endif %}{# method.http_options and not method.client_streaming and not method.server_streaming and not method.lro and not method.extended_lro and not method.paged_result_field #}
200+
{% endif %}{# method.http_options and not method.client_streaming and not method.lro and not method.extended_lro and not method.paged_result_field #}
198201

199202
{% endfor %}
200203
{% for method in service.methods.values()|sort(attribute="name") %}

gapic/templates/tests/unit/gapic/%name_%version/%sub/test_%service.py.j2

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ except ImportError: # pragma: NO COVER
2020
import grpc
2121
from grpc.experimental import aio
2222
{% if "rest" in opts.transport %}
23-
from collections.abc import Iterable
23+
from collections.abc import Iterable, AsyncIterable
2424
from google.protobuf import json_format
2525
import json
2626
{% endif %}
@@ -114,6 +114,11 @@ from google.iam.v1 import policy_pb2 # type: ignore
114114
{% endfilter %}
115115
{{ shared_macros.add_google_api_core_version_header_import(service.version) }}
116116

117+
async def mock_async_gen(data, chunk_size=1):
118+
for i in range(0, len(data)): # pragma: NO COVER
119+
chunk = data[i : i + chunk_size]
120+
yield chunk.encode("utf-8")
121+
117122
def client_cert_source_callback():
118123
return b"cert bytes", b"key bytes"
119124

gapic/templates/tests/unit/gapic/%name_%version/%sub/test_macros.j2

Lines changed: 28 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1004,14 +1004,13 @@ def test_{{ method_name }}_raw_page_lro():
10041004
{% with method_name = method.safe_name|snake_case + "_unary" if method.extended_lro and not full_extended_lro else method.name|snake_case, method_output = method.extended_lro.operation_type if method.extended_lro and not full_extended_lro else method.output %}{% if method.http_options %}
10051005
{# TODO(kbandes): remove this if condition when lro and client streaming are supported. #}
10061006
{% if not method.client_streaming %}
1007-
{# TODO(https://github.com/googleapis/gapic-generator-python/issues/2168): Remove unit test for server streaming method. #}
10081007
{# TODO(https://github.com/googleapis/gapic-generator-python/issues/2170): Remove unit test for long running operation method. #}
10091008
{# TODO(https://github.com/googleapis/gapic-generator-python/issues/2171): Remove unit test for pager method. #}
10101009
{# NOTE: This guard is added to avoid generating duplicate tests for methods which are tested elsewhere. As we implement each of the api methods
10111010
# in the `macro::call_success_test`, the case will be removed from this condition below.
10121011
# TODO(https://github.com/googleapis/gapic-generator-python/issues/2143): Remove the test `test_{{ method_name }}_rest` from here once the linked issue is resolved.
10131012
#}
1014-
{% if method.server_streaming or method.lro or method.extended_lro or method.paged_result_field %}
1013+
{% if method.lro or method.extended_lro or method.paged_result_field %}
10151014
@pytest.mark.parametrize("request_type", [
10161015
{{ method.input.ident }},
10171016
dict,
@@ -1914,15 +1913,15 @@ def test_unsupported_parameter_rest_asyncio():
19141913
{% endmacro %}
19151914

19161915
{# is_rest_unsupported_method renders:
1917-
# 'True' if transport is async REST.
1918-
# 'True' if transport is sync REST and method is a client streaming method.
1916+
# 'True' if transport is async REST and method is one of [client_streaming, lro, extended_lro, paged_result_field].
1917+
# 'True' if transport is sync REST and method is a client_streaming method.
19191918
# 'False' otherwise.
19201919
#}
19211920
{# TODO(https://github.com/googleapis/gapic-generator-python/issues/2152): Update this method as we add support for methods in async REST.
19221921
# There are no plans to add support for client streaming.
19231922
#}
19241923
{% macro is_rest_unsupported_method(method, is_async) %}
1925-
{%- if method.client_streaming or (is_async and (method.server_streaming or method.lro or method.extended_lro or method.paged_result_field)) -%}
1924+
{%- if method.client_streaming or (is_async and (method.lro or method.extended_lro or method.paged_result_field)) -%}
19261925
{{'True'}}
19271926
{%- else -%}
19281927
{{'False'}}
@@ -2062,7 +2061,7 @@ def test_initialize_client_w_{{transport_name}}():
20622061
{# call_success_test generates tests for rest methods
20632062
# when they make a successful request.
20642063
# NOTE: Currently, this macro does not support the following method
2065-
# types: [method.server_streaming, method.lro, method.extended_lro, method.paged_result_field].
2064+
# types: [method.lro, method.extended_lro, method.paged_result_field].
20662065
# As support is added for the above methods, the relevant guard can be removed from within the macro
20672066
# TODO(https://github.com/googleapis/gapic-generator-python/issues/2142): Clean up `rest_required_tests` as we add support for each of the method types metioned above.
20682067
#}
@@ -2076,14 +2075,13 @@ def test_initialize_client_w_{{transport_name}}():
20762075
# (method.extended_lro and not full_extended_lro)
20772076
#}
20782077
{% set method_output = method.output %}
2079-
{# TODO(https://github.com/googleapis/gapic-generator-python/issues/2168): Add unit test for server streaming method. #}
20802078
{# TODO(https://github.com/googleapis/gapic-generator-python/issues/2170): Add unit test for long running operation method. #}
20812079
{# TODO(https://github.com/googleapis/gapic-generator-python/issues/2171): Add unit test for pager method. #}
20822080
{# TODO(https://github.com/googleapis/gapic-generator-python/issues/2143): Update the guard below as we add support for each method, and keep it in sync with the guard in
20832081
# `rest_required_tests`, which should be the exact opposite. Remove it once we have all the methods supported in async rest transport that are supported in sync rest transport.
20842082
#}
2085-
{% if not (method.server_streaming or method.lro or method.extended_lro or method.paged_result_field)%}
2086-
{{async_decorator}}
2083+
{% if not (method.lro or method.extended_lro or method.paged_result_field)%}
2084+
{{ async_decorator }}
20872085
@pytest.mark.parametrize("request_type", [
20882086
{{ method.input.ident }},
20892087
dict,
@@ -2232,14 +2230,33 @@ def test_initialize_client_w_{{transport_name}}():
22322230
{% endif %}{# method.output.ident.is_proto_plus_type #}
22332231
json_return_value = json_format.MessageToJson(return_value)
22342232
{% endif %}{# method.void #}
2233+
{% if method.server_streaming %}
2234+
json_return_value = "[{}]".format(json_return_value)
2235+
{% if is_async %}
2236+
response_value.content.return_value = mock_async_gen(json_return_value)
2237+
{% else %}{# not is_async #}
2238+
response_value.iter_content = mock.Mock(return_value=iter(json_return_value))
2239+
{% endif %}{# is_async #}
2240+
{% else %}{# not method.streaming #}
22352241
{% if is_async %}
22362242
response_value.read = mock.AsyncMock(return_value=json_return_value.encode('UTF-8'))
2237-
{% else %}{# is_async #}
2243+
{% else %}{# not is_async #}
22382244
response_value.content = json_return_value.encode('UTF-8')
22392245
{% endif %}{# is_async #}
2246+
{% endif %}{# method.server_streaming #}
22402247
req.return_value = response_value
22412248
response = {{ await_prefix }}client.{{ method_name }}(request)
2242-
2249+
2250+
{% if method.server_streaming %}
2251+
{% if is_async %}
2252+
assert isinstance(response, AsyncIterable)
2253+
response = await response.__anext__()
2254+
{% else %}
2255+
assert isinstance(response, Iterable)
2256+
response = next(response)
2257+
{% endif %}
2258+
{% endif %}
2259+
22432260
# Establish that the response is the type that we expect.
22442261
{% if method.void %}
22452262
assert response is None

tests/integration/goldens/asset/tests/unit/gapic/asset_v1/test_asset_service.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323

2424
import grpc
2525
from grpc.experimental import aio
26-
from collections.abc import Iterable
26+
from collections.abc import Iterable, AsyncIterable
2727
from google.protobuf import json_format
2828
import json
2929
import math
@@ -71,6 +71,11 @@
7171
import google.auth
7272

7373

74+
async def mock_async_gen(data, chunk_size=1):
75+
for i in range(0, len(data)): # pragma: NO COVER
76+
chunk = data[i : i + chunk_size]
77+
yield chunk.encode("utf-8")
78+
7479
def client_cert_source_callback():
7580
return b"cert bytes", b"key bytes"
7681

tests/integration/goldens/credentials/tests/unit/gapic/credentials_v1/test_iam_credentials.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323

2424
import grpc
2525
from grpc.experimental import aio
26-
from collections.abc import Iterable
26+
from collections.abc import Iterable, AsyncIterable
2727
from google.protobuf import json_format
2828
import json
2929
import math
@@ -61,6 +61,11 @@
6161
import google.auth
6262

6363

64+
async def mock_async_gen(data, chunk_size=1):
65+
for i in range(0, len(data)): # pragma: NO COVER
66+
chunk = data[i : i + chunk_size]
67+
yield chunk.encode("utf-8")
68+
6469
def client_cert_source_callback():
6570
return b"cert bytes", b"key bytes"
6671

tests/integration/goldens/eventarc/tests/unit/gapic/eventarc_v1/test_eventarc.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323

2424
import grpc
2525
from grpc.experimental import aio
26-
from collections.abc import Iterable
26+
from collections.abc import Iterable, AsyncIterable
2727
from google.protobuf import json_format
2828
import json
2929
import math
@@ -81,6 +81,11 @@
8181
import google.auth
8282

8383

84+
async def mock_async_gen(data, chunk_size=1):
85+
for i in range(0, len(data)): # pragma: NO COVER
86+
chunk = data[i : i + chunk_size]
87+
yield chunk.encode("utf-8")
88+
8489
def client_cert_source_callback():
8590
return b"cert bytes", b"key bytes"
8691

tests/integration/goldens/logging/tests/unit/gapic/logging_v2/test_config_service_v2.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,11 @@
6161
import google.auth
6262

6363

64+
async def mock_async_gen(data, chunk_size=1):
65+
for i in range(0, len(data)): # pragma: NO COVER
66+
chunk = data[i : i + chunk_size]
67+
yield chunk.encode("utf-8")
68+
6469
def client_cert_source_callback():
6570
return b"cert bytes", b"key bytes"
6671

tests/integration/goldens/logging/tests/unit/gapic/logging_v2/test_logging_service_v2.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,11 @@
6262
import google.auth
6363

6464

65+
async def mock_async_gen(data, chunk_size=1):
66+
for i in range(0, len(data)): # pragma: NO COVER
67+
chunk = data[i : i + chunk_size]
68+
yield chunk.encode("utf-8")
69+
6570
def client_cert_source_callback():
6671
return b"cert bytes", b"key bytes"
6772

tests/integration/goldens/logging/tests/unit/gapic/logging_v2/test_metrics_service_v2.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,11 @@
6060
import google.auth
6161

6262

63+
async def mock_async_gen(data, chunk_size=1):
64+
for i in range(0, len(data)): # pragma: NO COVER
65+
chunk = data[i : i + chunk_size]
66+
yield chunk.encode("utf-8")
67+
6368
def client_cert_source_callback():
6469
return b"cert bytes", b"key bytes"
6570

tests/integration/goldens/redis/google/cloud/redis_v1/services/cloud_redis/transports/rest_asyncio.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
from google.api_core import gapic_v1
2828
from google.api_core import retry_async as retries
2929
from google.api_core import rest_helpers
30+
from google.api_core import rest_streaming_async # type: ignore
3031

3132
try:
3233
from google.api_core import rest_streaming_async # type: ignore

0 commit comments

Comments
 (0)