diff --git a/instagram/oauth2.py b/instagram/oauth2.py index b5db0fe9..7d6ec6ba 100644 --- a/instagram/oauth2.py +++ b/instagram/oauth2.py @@ -73,7 +73,7 @@ def _url_for_authorize(self, scope=None): client_params.update(scope=' '.join(scope)) url_params = urlencode(client_params) - return "%s?%s" % (self.api.authorize_url, url_params) + return "{url}?{params}".format(url=self.api.authorize_url, params=url_params) def _data_for_exchange(self, code=None, username=None, password=None, scope=None, user_id=None): client_params = { @@ -106,7 +106,7 @@ def get_authorize_login_url(self, scope=None): url = self._url_for_authorize(scope=scope) response, content = http_object.request(url) if response['status'] != '200': - raise OAuth2AuthExchangeError("The server returned a non-200 response for URL %s" % url) + raise OAuth2AuthExchangeError("The server returned a non-200 response for URL {url}".format(url)) redirected_to = response['Content-Location'] return redirected_to @@ -133,10 +133,10 @@ def __init__(self, api): def _generate_sig(self, endpoint, params, secret): # handle unicode when signing, urlencode can't handle otherwise. def enc_if_str(p): - return p.encode('utf-8') if isinstance(p, unicode) else p + return p.encode('utf-8') if isinstance(p, six.text_type) else p - p = ''.join('|{}={}'.format(k, enc_if_str(params[k])) for k in sorted(params.keys())) - sig = '{}{}'.format(endpoint, p) + path = ''.join('|{key}={val}'.format(key=key, val=enc_if_str(params[key])) for key in sorted(params.keys())) + sig = '{endpoint}{path}'.format(endpoint=endpoint, path=path) return hmac.new(secret.encode(), sig.encode(), sha256).hexdigest() def url_for_get(self, path, parameters): @@ -148,27 +148,30 @@ def get_request(self, path, **kwargs): def post_request(self, path, **kwargs): return self.make_request(self.prepare_request("POST", path, kwargs)) + # TODO - make use of six.moves.urllib.parse.urlparse for all this string munging def _full_url(self, path, include_secret=False, include_signed_request=True): - return "%s://%s%s%s%s%s" % (self.api.protocol, self.api.host, self.api.base_path, path, - self._auth_query(include_secret), - self._signed_request(path, {}, include_signed_request, include_secret)) + signed_request = self._signed_request(path, {}, include_signed_request, include_secret) + return "{protocol}://{host}{basepath}{path}{query}{signed}".format( + protocol=self.api.protocol, host=self.api.host, basepath=self.api.base_path, path=path, + query=self._auth_query(include_secret), signed=signed_request) def _full_url_with_params(self, path, params, include_secret=False, include_signed_request=True): - return (self._full_url(path, include_secret) + - self._full_query_with_params(params) + - self._signed_request(path, params, include_signed_request, include_secret)) + signed_request = self._signed_request(path, params, include_signed_request, include_secret) + return "{url}{query}{signed}".format( + url=self._full_url(path, include_secret), query=self._full_query_with_params(params), signed=signed_request) def _full_query_with_params(self, params): - params = ("&" + urlencode(params)) if params else "" - return params + if not params: + return "" + return "&{params}".format(params=urlencode(params)) def _auth_query(self, include_secret=False): if self.api.access_token: - return ("?%s=%s" % (self.api.access_token_field, self.api.access_token)) + return "?{field}={token}".format(field=self.api.access_token_field, token=self.api.access_token) elif self.api.client_id: - base = ("?client_id=%s" % (self.api.client_id)) + base = "?client_id={client_id}".format(client_id=self.api.client_id) if include_secret: - base += "&client_secret=%s" % (self.api.client_secret) + base += "&client_secret={client_secret}".format(client_secret=self.api.client_secret) return base def _signed_request(self, path, params, include_signed_request, include_secret): @@ -181,7 +184,7 @@ def _signed_request(self, path, params, include_signed_request, include_secret): if include_secret and self.api.client_secret: params['client_secret'] = self.api.client_secret - return "&sig=%s" % self._generate_sig(path, params, self.api.client_secret) + return "&sig={signed}".format(signed=self._generate_sig(path, params, self.api.client_secret)) else: return '' @@ -195,15 +198,16 @@ def get_content_type(file_name): return mimetypes.guess_type(file_name)[0] or "application/octet-stream" def encode_field(field_name): - return ("--" + boundary, - 'Content-Disposition: form-data; name="%s"' % (field_name), + return ("--{boundary}".format(boundary=boundary), + 'Content-Disposition: form-data; name="{field_name}"'.format(field_name=field_name), "", str(params[field_name])) def encode_file(field_name): file_name, file_handle = files[field_name] - return ("--" + boundary, - 'Content-Disposition: form-data; name="%s"; filename="%s"' % (field_name, file_name), - "Content-Type: " + get_content_type(file_name), + return ("--{boundary}".format(boundary=boundary), + 'Content-Disposition: form-data; name="{field_name}"; filename="{file_name}"'.format( + field_name=field_name, file_name=file_name), + "Content-Type: {content_type}".format(content_type=get_content_type(file_name)), "", file_handle.read()) lines = [] @@ -212,10 +216,10 @@ def encode_file(field_name): for field in files: lines.extend(encode_file(field)) - lines.extend(("--%s--" % (boundary), "")) + lines.extend(("--{boundary}--".format(boundary=boundary), "")) body = "\r\n".join(lines) - headers = {"Content-Type": "multipart/form-data; boundary=" + boundary, + headers = {"Content-Type": "multipart/form-data; boundary={boundary}".format(boundary=boundary), "Content-Length": str(len(body))} return body, headers @@ -243,12 +247,12 @@ def prepare_request(self, method, path, params, include_secret=False): def make_request(self, url, method="GET", body=None, headers=None): headers = headers or {} if 'User-Agent' not in headers: - headers.update({"User-Agent": "%s Python Client" % self.api.api_name}) + headers.update({"User-Agent": "{api_name} Python Client".format(api_name=self.api.api_name)}) # https://github.com/jcgregorio/httplib2/issues/173 # bug in httplib2 w/ Python 3 and disable_ssl_certificate_validation=True if six.PY3: - http_obj = Http(timeout=self.api.timeout) + http_obj = Http(timeout=self.api.timeout) else: http_obj = Http(timeout=self.api.timeout, disable_ssl_certificate_validation=True) diff --git a/tests.py b/tests.py index 8e0bfded..85f5ffe0 100755 --- a/tests.py +++ b/tests.py @@ -95,6 +95,16 @@ def test_xauth_exchange(self): assert access_token +class OAuth2RequestTests(unittest.TestCase): + def setUp(self): + super(OAuth2RequestTests, self).setUp() + self.api = TestInstagramAPI(access_token=access_token) + self.request = oauth2.OAuth2Request(self.api) + + def test_generate_sig(self): + self.request._generate_sig(endpoint='/', params=dict(count=1), secret=client_secret) + + class InstagramAPITests(unittest.TestCase): def setUp(self): super(InstagramAPITests, self).setUp()