Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 30 additions & 26 deletions instagram/oauth2.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ def _url_for_authorize(self, scope=None):
client_params.update(scope=' '.join(scope))

url_params = urlencode(client_params)
return "%s?%s" % (self.api.authorize_url, url_params)
return "{url}?{params}".format(url=self.api.authorize_url, params=url_params)

def _data_for_exchange(self, code=None, username=None, password=None, scope=None, user_id=None):
client_params = {
Expand Down Expand Up @@ -106,7 +106,7 @@ def get_authorize_login_url(self, scope=None):
url = self._url_for_authorize(scope=scope)
response, content = http_object.request(url)
if response['status'] != '200':
raise OAuth2AuthExchangeError("The server returned a non-200 response for URL %s" % url)
raise OAuth2AuthExchangeError("The server returned a non-200 response for URL {url}".format(url))

redirected_to = response['Content-Location']
return redirected_to
Expand All @@ -133,10 +133,10 @@ def __init__(self, api):
def _generate_sig(self, endpoint, params, secret):
# handle unicode when signing, urlencode can't handle otherwise.
def enc_if_str(p):
return p.encode('utf-8') if isinstance(p, unicode) else p
return p.encode('utf-8') if isinstance(p, six.text_type) else p

p = ''.join('|{}={}'.format(k, enc_if_str(params[k])) for k in sorted(params.keys()))
sig = '{}{}'.format(endpoint, p)
path = ''.join('|{key}={val}'.format(key=key, val=enc_if_str(params[key])) for key in sorted(params.keys()))
sig = '{endpoint}{path}'.format(endpoint=endpoint, path=path)
return hmac.new(secret.encode(), sig.encode(), sha256).hexdigest()

def url_for_get(self, path, parameters):
Expand All @@ -148,27 +148,30 @@ def get_request(self, path, **kwargs):
def post_request(self, path, **kwargs):
return self.make_request(self.prepare_request("POST", path, kwargs))

# TODO - make use of six.moves.urllib.parse.urlparse for all this string munging
def _full_url(self, path, include_secret=False, include_signed_request=True):
return "%s://%s%s%s%s%s" % (self.api.protocol, self.api.host, self.api.base_path, path,
self._auth_query(include_secret),
self._signed_request(path, {}, include_signed_request, include_secret))
signed_request = self._signed_request(path, {}, include_signed_request, include_secret)
return "{protocol}://{host}{basepath}{path}{query}{signed}".format(
protocol=self.api.protocol, host=self.api.host, basepath=self.api.base_path, path=path,
query=self._auth_query(include_secret), signed=signed_request)

def _full_url_with_params(self, path, params, include_secret=False, include_signed_request=True):
return (self._full_url(path, include_secret) +
self._full_query_with_params(params) +
self._signed_request(path, params, include_signed_request, include_secret))
signed_request = self._signed_request(path, params, include_signed_request, include_secret)
return "{url}{query}{signed}".format(
url=self._full_url(path, include_secret), query=self._full_query_with_params(params), signed=signed_request)

def _full_query_with_params(self, params):
params = ("&" + urlencode(params)) if params else ""
return params
if not params:
return ""
return "&{params}".format(params=urlencode(params))

def _auth_query(self, include_secret=False):
if self.api.access_token:
return ("?%s=%s" % (self.api.access_token_field, self.api.access_token))
return "?{field}={token}".format(field=self.api.access_token_field, token=self.api.access_token)
elif self.api.client_id:
base = ("?client_id=%s" % (self.api.client_id))
base = "?client_id={client_id}".format(client_id=self.api.client_id)
if include_secret:
base += "&client_secret=%s" % (self.api.client_secret)
base += "&client_secret={client_secret}".format(client_secret=self.api.client_secret)
return base

def _signed_request(self, path, params, include_signed_request, include_secret):
Expand All @@ -181,7 +184,7 @@ def _signed_request(self, path, params, include_signed_request, include_secret):
if include_secret and self.api.client_secret:
params['client_secret'] = self.api.client_secret

return "&sig=%s" % self._generate_sig(path, params, self.api.client_secret)
return "&sig={signed}".format(signed=self._generate_sig(path, params, self.api.client_secret))
else:
return ''

Expand All @@ -195,15 +198,16 @@ def get_content_type(file_name):
return mimetypes.guess_type(file_name)[0] or "application/octet-stream"

def encode_field(field_name):
return ("--" + boundary,
'Content-Disposition: form-data; name="%s"' % (field_name),
return ("--{boundary}".format(boundary=boundary),
'Content-Disposition: form-data; name="{field_name}"'.format(field_name=field_name),
"", str(params[field_name]))

def encode_file(field_name):
file_name, file_handle = files[field_name]
return ("--" + boundary,
'Content-Disposition: form-data; name="%s"; filename="%s"' % (field_name, file_name),
"Content-Type: " + get_content_type(file_name),
return ("--{boundary}".format(boundary=boundary),
'Content-Disposition: form-data; name="{field_name}"; filename="{file_name}"'.format(
field_name=field_name, file_name=file_name),
"Content-Type: {content_type}".format(content_type=get_content_type(file_name)),
"", file_handle.read())

lines = []
Expand All @@ -212,10 +216,10 @@ def encode_file(field_name):
for field in files:
lines.extend(encode_file(field))

lines.extend(("--%s--" % (boundary), ""))
lines.extend(("--{boundary}--".format(boundary=boundary), ""))
body = "\r\n".join(lines)

headers = {"Content-Type": "multipart/form-data; boundary=" + boundary,
headers = {"Content-Type": "multipart/form-data; boundary={boundary}".format(boundary=boundary),
"Content-Length": str(len(body))}

return body, headers
Expand Down Expand Up @@ -243,12 +247,12 @@ def prepare_request(self, method, path, params, include_secret=False):
def make_request(self, url, method="GET", body=None, headers=None):
headers = headers or {}
if 'User-Agent' not in headers:
headers.update({"User-Agent": "%s Python Client" % self.api.api_name})
headers.update({"User-Agent": "{api_name} Python Client".format(api_name=self.api.api_name)})

# https://github.com/jcgregorio/httplib2/issues/173
# bug in httplib2 w/ Python 3 and disable_ssl_certificate_validation=True
if six.PY3:
http_obj = Http(timeout=self.api.timeout)
http_obj = Http(timeout=self.api.timeout)
else:
http_obj = Http(timeout=self.api.timeout, disable_ssl_certificate_validation=True)

Expand Down
10 changes: 10 additions & 0 deletions tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,16 @@ def test_xauth_exchange(self):
assert access_token


class OAuth2RequestTests(unittest.TestCase):
def setUp(self):
super(OAuth2RequestTests, self).setUp()
self.api = TestInstagramAPI(access_token=access_token)
self.request = oauth2.OAuth2Request(self.api)

def test_generate_sig(self):
self.request._generate_sig(endpoint='/', params=dict(count=1), secret=client_secret)


class InstagramAPITests(unittest.TestCase):
def setUp(self):
super(InstagramAPITests, self).setUp()
Expand Down