-
-
Notifications
You must be signed in to change notification settings - Fork 4.7k
feat(oauth): Implement PKCE (S256) and enforce redirect_uri binding #99452
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
48c7279
cf9375b
2188991
123923d
a2d3d5d
e957c56
49c6bfe
718cdc4
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,25 @@ | ||
| from django.db import migrations, models | ||
|
|
||
| from sentry.new_migrations.migrations import CheckedMigration | ||
|
|
||
|
|
||
| class Migration(CheckedMigration): | ||
| # Simple nullable columns; safe to run during deploy | ||
| is_post_deployment = False | ||
|
|
||
| dependencies = [ | ||
| ("sentry", "0980_integrations_json_field"), | ||
| ] | ||
|
|
||
| operations = [ | ||
| migrations.AddField( | ||
| model_name="apigrant", | ||
| name="code_challenge", | ||
| field=models.CharField(max_length=128, null=True, blank=True), | ||
| ), | ||
| migrations.AddField( | ||
| model_name="apigrant", | ||
| name="code_challenge_method", | ||
| field=models.CharField(max_length=10, null=True, blank=True), | ||
| ), | ||
| ] |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,28 @@ | ||
| from __future__ import annotations | ||
|
|
||
| import re | ||
|
|
||
| # PKCE helpers shared between OAuth authorize/token views | ||
|
|
||
| PKCE_METHOD_S256 = "S256" | ||
| PKCE_METHOD_PLAIN = "plain" | ||
| PKCE_DEFAULT_METHOD = PKCE_METHOD_PLAIN | ||
| _PKCE_CODE_PATTERN = re.compile(r"^[A-Za-z0-9\-\._~]{43,128}$") | ||
|
|
||
|
|
||
| def validate_code_challenge(challenge: str | None) -> bool: | ||
| if not challenge: | ||
| return False | ||
| return bool(_PKCE_CODE_PATTERN.match(challenge)) | ||
|
|
||
|
|
||
| def normalize_pkce_method(method_raw: str | None) -> str | None: | ||
| if not method_raw: | ||
| return PKCE_DEFAULT_METHOD | ||
|
|
||
| method_key = method_raw.upper() | ||
| if method_key == "S256": | ||
| return PKCE_METHOD_S256 | ||
| if method_key == "PLAIN": | ||
| return PKCE_METHOD_PLAIN | ||
| return None |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -18,6 +18,7 @@ | |
| from sentry.users.models.user import User | ||
| from sentry.users.services.user.service import user_service | ||
| from sentry.utils import metrics | ||
| from sentry.utils.oauth import PKCE_METHOD_PLAIN, normalize_pkce_method, validate_code_challenge | ||
| from sentry.web.frontend.auth_login import AuthLoginView | ||
|
|
||
| logger = logging.getLogger("sentry.api.oauth_authorize") | ||
|
|
@@ -85,6 +86,8 @@ def get(self, request: HttpRequest, **kwargs) -> HttpResponseBase: | |
| redirect_uri = request.GET.get("redirect_uri") | ||
| state = request.GET.get("state") | ||
| force_prompt = request.GET.get("force_prompt") | ||
| code_challenge = request.GET.get("code_challenge") | ||
| code_challenge_method = request.GET.get("code_challenge_method") | ||
|
|
||
| if not client_id: | ||
| return self.error( | ||
|
|
@@ -151,6 +154,52 @@ def get(self, request: HttpRequest, **kwargs) -> HttpResponseBase: | |
| err_response="client_id", | ||
| ) | ||
|
|
||
| # Validate PKCE inputs (when provided). For v1+ applications, only S256 is allowed; | ||
| # for v0, allow "plain" to avoid breakage. Spec default for missing method is "plain" (RFC 7636 §4.3). | ||
| if code_challenge_method and not code_challenge: | ||
| return self.error( | ||
| request=request, | ||
| client_id=client_id, | ||
| response_type=response_type, | ||
| redirect_uri=redirect_uri, | ||
| name="invalid_request", | ||
| err_response="code_challenge", | ||
| ) | ||
|
|
||
| if code_challenge: | ||
| method = normalize_pkce_method(code_challenge_method) | ||
|
|
||
| if method is None: | ||
| return self.error( | ||
| request=request, | ||
| client_id=client_id, | ||
| response_type=response_type, | ||
| redirect_uri=redirect_uri, | ||
| name="invalid_request", | ||
| err_response="code_challenge_method", | ||
| ) | ||
| if not validate_code_challenge(code_challenge): | ||
| return self.error( | ||
| request=request, | ||
| client_id=client_id, | ||
| response_type=response_type, | ||
| redirect_uri=redirect_uri, | ||
| name="invalid_request", | ||
| err_response="code_challenge", | ||
| ) | ||
| if method == PKCE_METHOD_PLAIN: | ||
| app_version = application.version | ||
| if app_version >= 1: | ||
| return self.error( | ||
| request=request, | ||
| client_id=client_id, | ||
| response_type=response_type, | ||
| redirect_uri=redirect_uri, | ||
| name="invalid_request", | ||
| err_response="code_challenge_method", | ||
| ) | ||
| code_challenge_method = method | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Bug: Version Access Errors and PKCE Validation IssuesAccessing Additional Locations (1) |
||
|
|
||
| scopes_s = request.GET.get("scope") | ||
| if scopes_s: | ||
| scopes = scopes_s.split(" ") | ||
|
|
@@ -182,13 +231,17 @@ def get(self, request: HttpRequest, **kwargs) -> HttpResponseBase: | |
| state=state, | ||
| ) | ||
|
|
||
| # Defer PKCE validation until after we have `application` to check version. | ||
|
|
||
| payload = { | ||
| "rt": response_type, | ||
| "cid": client_id, | ||
| "ru": redirect_uri, | ||
| "sc": scopes, | ||
| "st": state, | ||
| "uid": request.user.id if request.user.is_authenticated else "", | ||
| "cc": code_challenge or "", | ||
| "ccm": code_challenge_method or "", | ||
| } | ||
| request.session["oa2"] = payload | ||
|
|
||
|
|
@@ -225,6 +278,8 @@ def get(self, request: HttpRequest, **kwargs) -> HttpResponseBase: | |
| "sc": scopes, | ||
| "st": state, | ||
| "uid": request.user.id, | ||
| "cc": code_challenge or "", | ||
| "ccm": code_challenge_method or "", | ||
| } | ||
| request.session["oa2"] = payload | ||
|
|
||
|
|
@@ -351,6 +406,10 @@ def approve( | |
| redirect_uri, | ||
| state, | ||
| ) -> HttpResponseBase: | ||
| # Pull PKCE data (if any) from the session payload prepared during GET | ||
| sess_payload = request.session.get("oa2", {}) | ||
| sess_code_challenge = sess_payload.get("cc") or None | ||
| sess_code_challenge_method = sess_payload.get("ccm") or None | ||
| # Some applications require org level access, so user who approves only gives | ||
| # access to that organization by selecting one. If None, means the application | ||
| # has user level access and will be able to have access to all the organizations of that user. | ||
|
|
@@ -391,6 +450,8 @@ def approve( | |
| redirect_uri=redirect_uri, | ||
| scope_list=scopes, | ||
| organization_id=selected_organization_id, | ||
| code_challenge=sess_code_challenge, | ||
| code_challenge_method=sess_code_challenge_method, | ||
| ) | ||
| logger.info( | ||
| "approve.grant", | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.