Skip to content
This repository was archived by the owner on Mar 9, 2026. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions UPGRADING.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ python3 -m pip install google-cloud-pubsub

* The script `fixup_pubsub_v1_keywords.py` is shipped with the library. It expects
an input directory (with the code to convert) and an empty destination directory.
Optionally, the `--use-keywords` switch can be added to generate flattened keyword
parameters instead of a request dictionary (see the following section for an
explanation).

```sh
$ scripts/fixup_pubsub_v1_keywords.py --input-directory .samples/ --output-directory samples/
Expand Down
278 changes: 211 additions & 67 deletions scripts/fixup_pubsub_v1_keywords.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,11 @@
import libcst as cst
import pathlib
import sys
from typing import (Any, Callable, Dict, List, Sequence, Tuple)
from typing import Any, Callable, Dict, List, Sequence, Tuple


def partition(
predicate: Callable[[Any], bool],
iterator: Sequence[Any]
predicate: Callable[[Any], bool], iterator: Sequence[Any]
) -> Tuple[List[Any], List[Any]]:
"""A stable, out-of-place partition."""
results = ([], [])
Expand All @@ -38,40 +37,128 @@ def partition(


class pubsubCallTransformer(cst.CSTTransformer):
CTRL_PARAMS: Tuple[str] = ('retry', 'timeout', 'metadata')
CTRL_PARAMS: Tuple[str] = ("retry", "timeout", "metadata")
METHOD_TO_PARAMS: Dict[str, Tuple[str]] = {
'acknowledge': ('subscription', 'ack_ids', ),
'create_snapshot': ('name', 'subscription', 'labels', ),
'create_subscription': ('name', 'topic', 'push_config', 'ack_deadline_seconds', 'retain_acked_messages', 'message_retention_duration', 'labels', 'enable_message_ordering', 'expiration_policy', 'filter', 'dead_letter_policy', 'retry_policy', 'detached', ),
'create_topic': ('name', 'labels', 'message_storage_policy', 'kms_key_name', ),
'delete_snapshot': ('snapshot', ),
'delete_subscription': ('subscription', ),
'delete_topic': ('topic', ),
'detach_subscription': ('subscription', ),
'get_snapshot': ('snapshot', ),
'get_subscription': ('subscription', ),
'get_topic': ('topic', ),
'list_snapshots': ('project', 'page_size', 'page_token', ),
'list_subscriptions': ('project', 'page_size', 'page_token', ),
'list_topics': ('project', 'page_size', 'page_token', ),
'list_topic_snapshots': ('topic', 'page_size', 'page_token', ),
'list_topic_subscriptions': ('topic', 'page_size', 'page_token', ),
'modify_ack_deadline': ('subscription', 'ack_ids', 'ack_deadline_seconds', ),
'modify_push_config': ('subscription', 'push_config', ),
'publish': ('topic', 'messages', ),
'pull': ('subscription', 'max_messages', 'return_immediately', ),
'seek': ('subscription', 'time', 'snapshot', ),
'streaming_pull': ('subscription', 'stream_ack_deadline_seconds', 'ack_ids', 'modify_deadline_seconds', 'modify_deadline_ack_ids', 'client_id', 'max_outstanding_messages', 'max_outstanding_bytes', ),
'update_snapshot': ('snapshot', 'update_mask', ),
'update_subscription': ('subscription', 'update_mask', ),
'update_topic': ('topic', 'update_mask', ),

'get_iam_policy': ('resource', 'options', ),
'set_iam_policy': ('resource', 'policy', ),
'test_iam_permissions': ('resource', 'permissions', ),

"acknowledge": (
"subscription",
"ack_ids",
),
"create_snapshot": (
"name",
"subscription",
"labels",
),
"create_subscription": (
"name",
"topic",
"push_config",
"ack_deadline_seconds",
"retain_acked_messages",
"message_retention_duration",
"labels",
"enable_message_ordering",
"expiration_policy",
"filter",
"dead_letter_policy",
"retry_policy",
"detached",
),
"create_topic": (
"name",
"labels",
"message_storage_policy",
"kms_key_name",
),
"delete_snapshot": ("snapshot",),
"delete_subscription": ("subscription",),
"delete_topic": ("topic",),
"detach_subscription": ("subscription",),
"get_snapshot": ("snapshot",),
"get_subscription": ("subscription",),
"get_topic": ("topic",),
"list_snapshots": (
"project",
"page_size",
"page_token",
),
"list_subscriptions": (
"project",
"page_size",
"page_token",
),
"list_topics": (
"project",
"page_size",
"page_token",
),
"list_topic_snapshots": (
"topic",
"page_size",
"page_token",
),
"list_topic_subscriptions": (
"topic",
"page_size",
"page_token",
),
"modify_ack_deadline": (
"subscription",
"ack_ids",
"ack_deadline_seconds",
),
"modify_push_config": (
"subscription",
"push_config",
),
"pull": (
"subscription",
"max_messages",
"return_immediately",
),
"seek": (
"subscription",
"time",
"snapshot",
),
"streaming_pull": (
"subscription",
"stream_ack_deadline_seconds",
"ack_ids",
"modify_deadline_seconds",
"modify_deadline_ack_ids",
"client_id",
"max_outstanding_messages",
"max_outstanding_bytes",
),
"update_snapshot": (
"snapshot",
"update_mask",
),
"update_subscription": (
"subscription",
"update_mask",
),
"update_topic": (
"topic",
"update_mask",
),
"get_iam_policy": (
"resource",
"options",
),
"set_iam_policy": (
"resource",
"policy",
),
"test_iam_permissions": (
"resource",
"permissions",
),
}

def __init__(self, use_keywords=False):
self._use_keywords = use_keywords

def leave_Call(self, original: cst.Call, updated: cst.Call) -> cst.CSTNode:
try:
key = original.func.attr.value
Expand All @@ -88,35 +175,80 @@ def leave_Call(self, original: cst.Call, updated: cst.Call) -> cst.CSTNode:
return updated

kwargs, ctrl_kwargs = partition(
lambda a: not a.keyword.value in self.CTRL_PARAMS,
kwargs
lambda a: not a.keyword.value in self.CTRL_PARAMS, kwargs
)

args, ctrl_args = args[:len(kword_params)], args[len(kword_params):]
ctrl_kwargs.extend(cst.Arg(value=a.value, keyword=cst.Name(value=ctrl))
for a, ctrl in zip(ctrl_args, self.CTRL_PARAMS))
args, ctrl_args = args[: len(kword_params)], args[len(kword_params) :]
ctrl_kwargs.extend(
cst.Arg(
value=a.value,
keyword=cst.Name(value=ctrl),
equal=cst.AssignEqual(
whitespace_before=cst.SimpleWhitespace(""),
whitespace_after=cst.SimpleWhitespace(""),
),
)
for a, ctrl in zip(ctrl_args, self.CTRL_PARAMS)
)

request_arg = cst.Arg(
value=cst.Dict([
cst.DictElement(
cst.SimpleString("'{}'".format(name)),
cst.Element(value=arg.value)
if self._use_keywords:
new_kwargs = [
cst.Arg(
value=arg.value,
keyword=cst.Name(value=name),
equal=cst.AssignEqual(
whitespace_before=cst.SimpleWhitespace(""),
whitespace_after=cst.SimpleWhitespace(""),
),
)
# Note: the args + kwargs looks silly, but keep in mind that
# the control parameters had to be stripped out, and that
# those could have been passed positionally or by keyword.
for name, arg in zip(kword_params, args + kwargs)]),
keyword=cst.Name("request")
)
for name, arg in zip(kword_params, args + kwargs)
]
new_kwargs.extend(
[
cst.Arg(
value=arg.value,
keyword=cst.Name(value=arg.keyword.value),
equal=cst.AssignEqual(
whitespace_before=cst.SimpleWhitespace(""),
whitespace_after=cst.SimpleWhitespace(""),
),
)
for arg in ctrl_kwargs
]
)
return updated.with_changes(args=new_kwargs)
else:
request_arg = cst.Arg(
value=cst.Dict(
[
cst.DictElement(
cst.SimpleString('"{}"'.format(name)),
cst.Element(value=arg.value),
)
for name, arg in zip(kword_params, args + kwargs)
]
+ [
cst.DictElement(
cst.SimpleString('"{}"'.format(arg.keyword.value)),
cst.Element(value=arg.value),
)
for arg in ctrl_kwargs
]
),
keyword=cst.Name("request"),
equal=cst.AssignEqual(
whitespace_before=cst.SimpleWhitespace(""),
whitespace_after=cst.SimpleWhitespace(""),
),
)

return updated.with_changes(
args=[request_arg] + ctrl_kwargs
)
return updated.with_changes(args=[request_arg])


def fix_files(
in_dir: pathlib.Path,
out_dir: pathlib.Path,
use_keywords: bool = False,
*,
transformer=pubsubCallTransformer(),
):
Expand All @@ -129,11 +261,12 @@ def fix_files(
pyfile_gen = (
pathlib.Path(os.path.join(root, f))
for root, _, files in os.walk(in_dir)
for f in files if os.path.splitext(f)[1] == ".py"
for f in files
if os.path.splitext(f)[1] == ".py"
)

for fpath in pyfile_gen:
with open(fpath, 'r') as f:
with open(fpath, "r") as f:
src = f.read()

# Parse the code and insert method call fixes.
Expand All @@ -145,11 +278,11 @@ def fix_files(
updated_path.parent.mkdir(parents=True, exist_ok=True)

# Generate the updated source file at the corresponding path.
with open(updated_path, 'w') as f:
with open(updated_path, "w") as f:
f.write(updated.code)


if __name__ == '__main__':
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="""Fix up source that uses the pubsub client library.

Expand All @@ -164,24 +297,34 @@ def fix_files(

These all constitute false negatives. The tool will also detect false
positives when an API method shares a name with another method.
""")
"""
)
parser.add_argument(
'-d',
'--input-directory',
"-d",
"--input-directory",
required=True,
dest='input_dir',
help='the input directory to walk for python files to fix up',
dest="input_dir",
help="the input directory to walk for python files to fix up",
)
parser.add_argument(
'-o',
'--output-directory',
"-o",
"--output-directory",
required=True,
dest='output_dir',
help='the directory to output files fixed via un-flattening',
dest="output_dir",
help="the directory to output files fixed via un-flattening",
)
parser.add_argument(
"-k",
"--use-keywords",
required=False,
action="store_true",
dest="use_keywords",
help="Use keyword arguments instead of constructing a request",
)
args = parser.parse_args()
input_dir = pathlib.Path(args.input_dir)
output_dir = pathlib.Path(args.output_dir)
use_keywords = args.use_keywords
if not input_dir.is_dir():
print(
f"input directory '{input_dir}' does not exist or is not a directory",
Expand All @@ -203,4 +346,5 @@ def fix_files(
)
sys.exit(-1)

fix_files(input_dir, output_dir)
transformer = pubsubCallTransformer(use_keywords=use_keywords)
fix_files(input_dir, output_dir, use_keywords, transformer=transformer)