From 48c7279641f59da3d0d152b9c2a6ee9d2f194e6b Mon Sep 17 00:00:00 2001 From: David Cramer Date: Sun, 14 Sep 2025 14:03:38 -0400 Subject: [PATCH 1/8] feat(oauth): Implement PKCE (S256) and enforce redirect_uri binding MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit RFCs: - RFC 7636 (PKCE): §4.1/§4.2/§4.3/§4.4/§4.6 - RFC 6749 (OAuth 2.0): §4.1.3 (redirect_uri binding) - OAuth 2.1 draft (draft-ietf-oauth-v2-1-13) Changes: - Model/migration: add ApiGrant.code_challenge + code_challenge_method (nullable) - Authorization: accept + persist code_challenge(+_method); default S256; deny plain by default - Token: require code_verifier when grant has challenge; length/charset per RFC 7636; S256 verification; optional plain behind flag - Token: enforce redirect_uri equality when stored on grant (no fallback) - Tests: S256 happy path; missing/mismatch/length/charset; optional plain; redirect binding Notes: - Local constants gate plain method; discovery metadata excluded (separate task) - No implicit flow changes; non-cacheable responses preserved Refs: getsentry/sentry#99002 --- .../migrations/0981_apigrant_pkce_fields.py | 25 ++ src/sentry/models/apigrant.py | 6 + src/sentry/web/frontend/oauth_authorize.py | 41 ++++ src/sentry/web/frontend/oauth_token.py | 41 +++- .../web/frontend/test_oauth_authorize.py | 23 ++ tests/sentry/web/frontend/test_oauth_token.py | 226 +++++++++++++++++- 6 files changed, 346 insertions(+), 16 deletions(-) create mode 100644 src/sentry/migrations/0981_apigrant_pkce_fields.py diff --git a/src/sentry/migrations/0981_apigrant_pkce_fields.py b/src/sentry/migrations/0981_apigrant_pkce_fields.py new file mode 100644 index 00000000000000..80ec42199ae3bb --- /dev/null +++ b/src/sentry/migrations/0981_apigrant_pkce_fields.py @@ -0,0 +1,25 @@ +from django.db import migrations, models + +from sentry.new_migrations.migrations import CheckedMigration + + +class Migration(CheckedMigration): + # Simple nullable columns; safe to run during deploy + is_post_deployment = False + + dependencies = [ + ("sentry", "0980_integrations_json_field"), + ] + + operations = [ + migrations.AddField( + model_name="apigrant", + name="code_challenge", + field=models.CharField(max_length=128, null=True, blank=True), + ), + migrations.AddField( + model_name="apigrant", + name="code_challenge_method", + field=models.CharField(max_length=10, null=True, blank=True), + ), + ] diff --git a/src/sentry/models/apigrant.py b/src/sentry/models/apigrant.py index 7e368f1b7a9d72..91ab17bdeb47a4 100644 --- a/src/sentry/models/apigrant.py +++ b/src/sentry/models/apigrant.py @@ -83,6 +83,12 @@ class ApiGrant(Model): on_delete="CASCADE", ) + # PKCE (RFC 7636) parameters persisted from authorization request + # - code_challenge: BASE64URL-encoded string (43-128 chars) + # - code_challenge_method: typically "S256"; "plain" optionally allowed + code_challenge = models.CharField(max_length=128, null=True, blank=True) + code_challenge_method = models.CharField(max_length=10, null=True, blank=True) + class Meta: app_label = "sentry" db_table = "sentry_apigrant" diff --git a/src/sentry/web/frontend/oauth_authorize.py b/src/sentry/web/frontend/oauth_authorize.py index 6e9d239753a719..e416df40e2004d 100644 --- a/src/sentry/web/frontend/oauth_authorize.py +++ b/src/sentry/web/frontend/oauth_authorize.py @@ -22,6 +22,10 @@ logger = logging.getLogger("sentry.api.oauth_authorize") +# PKCE behavior: Prefer S256; optionally allow "plain" when toggled on +PKCE_ALLOW_PLAIN = False +PKCE_DEFAULT_METHOD = "S256" + class OAuthAuthorizeView(AuthLoginView): auth_required = False @@ -85,6 +89,8 @@ def get(self, request: HttpRequest, **kwargs) -> HttpResponseBase: redirect_uri = request.GET.get("redirect_uri") state = request.GET.get("state") force_prompt = request.GET.get("force_prompt") + code_challenge = request.GET.get("code_challenge") + code_challenge_method = request.GET.get("code_challenge_method") if not client_id: return self.error( @@ -182,6 +188,31 @@ def get(self, request: HttpRequest, **kwargs) -> HttpResponseBase: state=state, ) + # Validate PKCE inputs (when provided). We only allow S256 by default. + if code_challenge: + # Default to S256 if method omitted + method = (code_challenge_method or PKCE_DEFAULT_METHOD).upper() + if method not in ("S256", "PLAIN"): + return self.error( + request=request, + client_id=client_id, + response_type=response_type, + redirect_uri=redirect_uri, + name="invalid_request", + err_response="code_challenge_method", + ) + if method == "PLAIN" and not PKCE_ALLOW_PLAIN: + return self.error( + request=request, + client_id=client_id, + response_type=response_type, + redirect_uri=redirect_uri, + name="invalid_request", + err_response="code_challenge_method", + ) + # Normalize casing for storage + code_challenge_method = method + payload = { "rt": response_type, "cid": client_id, @@ -189,6 +220,8 @@ def get(self, request: HttpRequest, **kwargs) -> HttpResponseBase: "sc": scopes, "st": state, "uid": request.user.id if request.user.is_authenticated else "", + "cc": code_challenge or "", + "ccm": code_challenge_method or "", } request.session["oa2"] = payload @@ -225,6 +258,8 @@ def get(self, request: HttpRequest, **kwargs) -> HttpResponseBase: "sc": scopes, "st": state, "uid": request.user.id, + "cc": code_challenge or "", + "ccm": code_challenge_method or "", } request.session["oa2"] = payload @@ -351,6 +386,10 @@ def approve( redirect_uri, state, ) -> HttpResponseBase: + # Pull PKCE data (if any) from the session payload prepared during GET + sess_payload = request.session.get("oa2", {}) if hasattr(request, "session") else {} + sess_code_challenge = sess_payload.get("cc") or None + sess_code_challenge_method = sess_payload.get("ccm") or None # Some applications require org level access, so user who approves only gives # access to that organization by selecting one. If None, means the application # has user level access and will be able to have access to all the organizations of that user. @@ -391,6 +430,8 @@ def approve( redirect_uri=redirect_uri, scope_list=scopes, organization_id=selected_organization_id, + code_challenge=sess_code_challenge, + code_challenge_method=sess_code_challenge_method, ) logger.info( "approve.grant", diff --git a/src/sentry/web/frontend/oauth_token.py b/src/sentry/web/frontend/oauth_token.py index 8f61ed10ae9d88..a5ad2cb25038d4 100644 --- a/src/sentry/web/frontend/oauth_token.py +++ b/src/sentry/web/frontend/oauth_token.py @@ -1,6 +1,9 @@ from __future__ import annotations +import base64 +import hashlib import logging +import re from datetime import datetime from typing import Literal, NotRequired, TypedDict @@ -24,6 +27,10 @@ logger = logging.getLogger("sentry.api.oauth_token") +# PKCE behavior: Prefer S256; allow "plain" only when toggled on +PKCE_ALLOW_PLAIN = False +_PKCE_VERIFIER_RE = re.compile(r"^[A-Za-z0-9\-\._~]{43,128}$") + class _TokenInformationUser(TypedDict): id: str @@ -138,11 +145,37 @@ def get_access_tokens(self, request: Request, application: ApiApplication) -> di if grant.is_expired(): return {"error": "invalid_grant", "reason": "grant expired"} + # Enforce redirect_uri binding: when the grant was issued with a redirect_uri, + # the token request MUST include the identical redirect_uri (RFC 6749 §4.1.3). redirect_uri = request.POST.get("redirect_uri") - if not redirect_uri: - redirect_uri = application.get_default_redirect_uri() - elif grant.redirect_uri != redirect_uri: - return {"error": "invalid_grant", "reason": "invalid redirect URI"} + if grant.redirect_uri: + if not redirect_uri or grant.redirect_uri != redirect_uri: + return {"error": "invalid_grant", "reason": "invalid redirect URI"} + + # PKCE verification (RFC 7636): if the grant has a stored code_challenge, + # require a valid code_verifier on the token request. + if grant.code_challenge: + code_verifier = request.POST.get("code_verifier") + if not code_verifier: + return {"error": "invalid_grant", "reason": "missing code_verifier"} + # Enforce length and charset + if len(code_verifier) < 43 or len(code_verifier) > 128: + return {"error": "invalid_grant", "reason": "invalid code_verifier length"} + if not _PKCE_VERIFIER_RE.match(code_verifier): + return {"error": "invalid_grant", "reason": "invalid code_verifier charset"} + + method = (grant.code_challenge_method or "S256").upper() + if method == "S256": + digest = hashlib.sha256(code_verifier.encode("ascii")).digest() + computed = base64.urlsafe_b64encode(digest).decode("ascii").rstrip("=") + if computed != grant.code_challenge: + return {"error": "invalid_grant", "reason": "pkce verification failed"} + elif method == "PLAIN": + if not PKCE_ALLOW_PLAIN or code_verifier != grant.code_challenge: + return {"error": "invalid_grant", "reason": "pkce verification failed"} + else: + # Unknown method on stored grant; reject + return {"error": "invalid_grant", "reason": "unsupported pkce method"} try: token_data = {"token": ApiToken.from_grant(grant=grant)} diff --git a/tests/sentry/web/frontend/test_oauth_authorize.py b/tests/sentry/web/frontend/test_oauth_authorize.py index 8e7574a15225ee..51acdbd657f41e 100644 --- a/tests/sentry/web/frontend/test_oauth_authorize.py +++ b/tests/sentry/web/frontend/test_oauth_authorize.py @@ -97,6 +97,29 @@ def test_minimal_params_approve_flow(self) -> None: authorization = ApiAuthorization.objects.get(user=self.user, application=self.application) assert authorization.get_scopes() == grant.get_scopes() + def test_persists_pkce_fields_when_provided(self) -> None: + self.login_as(self.user) + + # Provide PKCE inputs on the authorization request + resp = self.client.get( + f"{self.path}?response_type=code&client_id={self.application.client_id}" + "&code_challenge=WWxwZkV3MldEdmZ2N2d0X1pPZ1NrZzN1RDl0MHBZQUpzQ1ZzQWxkSW1aZw" + "&code_challenge_method=S256" + ) + + assert resp.status_code == 200 + self.assertTemplateUsed("sentry/oauth-authorize.html") + + # Approve flow should persist PKCE fields on the ApiGrant + resp = self.client.post(self.path, {"op": "approve"}) + assert resp.status_code == 302 + + grant = ApiGrant.objects.get(user=self.user) + assert grant.code_challenge == ( + "WWxwZkV3MldEdmZ2N2d0X1pPZ1NrZzN1RDl0MHBZQUpzQ1ZzQWxkSW1aZw" + ) + assert grant.code_challenge_method == "S256" + def test_minimal_params_deny_flow(self) -> None: self.login_as(self.user) diff --git a/tests/sentry/web/frontend/test_oauth_token.py b/tests/sentry/web/frontend/test_oauth_token.py index 683a47c7d49541..0771f1a9b0dc09 100644 --- a/tests/sentry/web/frontend/test_oauth_token.py +++ b/tests/sentry/web/frontend/test_oauth_token.py @@ -220,6 +220,7 @@ def test_grant_lock(self) -> None: "grant_type": "authorization_code", "code": self.grant.code, "client_id": self.application.client_id, + "redirect_uri": self.application.get_default_redirect_uri(), "client_secret": self.client_secret, }, ) @@ -254,6 +255,7 @@ def test_no_open_id_token(self) -> None: "grant_type": "authorization_code", "code": self.grant.code, "client_id": self.application.client_id, + "redirect_uri": self.application.get_default_redirect_uri(), "client_secret": self.client_secret, }, ) @@ -262,9 +264,9 @@ def test_no_open_id_token(self) -> None: data = json.loads(resp.content) assert "id_token" not in data - def test_valid_no_redirect_uri(self) -> None: + def test_redirect_binding_required_when_grant_has_redirect(self) -> None: """ - Checks that we get the correct redirect URI if we don't pass one in + Omitting redirect_uri on token exchange must fail when grant stored it. """ self.login_as(self.user) @@ -278,19 +280,219 @@ def test_valid_no_redirect_uri(self) -> None: }, ) + assert resp.status_code == 400 + assert json.loads(resp.content) == {"error": "invalid_grant"} + + def test_redirect_binding_mismatch(self) -> None: + self.login_as(self.user) + resp = self.client.post( + self.path, + { + "grant_type": "authorization_code", + "code": self.grant.code, + "client_id": self.application.client_id, + "redirect_uri": "https://example.org/callback", + "client_secret": self.client_secret, + }, + ) + assert resp.status_code == 400 + assert json.loads(resp.content) == {"error": "invalid_grant"} + + def test_pkce_s256_happy_path(self) -> None: + import base64 + import hashlib + + self.login_as(self.user) + # Create a grant with S256 code challenge derived from a known verifier + verifier = "a" * 50 + digest = hashlib.sha256(verifier.encode("ascii")).digest() + challenge = base64.urlsafe_b64encode(digest).decode("ascii").rstrip("=") + + grant = ApiGrant.objects.create( + user=self.user, + application=self.application, + redirect_uri=self.application.get_default_redirect_uri(), + code_challenge=challenge, + code_challenge_method="S256", + ) + + resp = self.client.post( + self.path, + { + "grant_type": "authorization_code", + "redirect_uri": self.application.get_default_redirect_uri(), + "code": grant.code, + "code_verifier": verifier, + "client_id": self.application.client_id, + "client_secret": self.client_secret, + }, + ) assert resp.status_code == 200 - data = json.loads(resp.content) - token = ApiToken.objects.get(token=data["access_token"]) - assert token.application == self.application - assert token.user == self.grant.user - assert token.get_scopes() == self.grant.get_scopes() + def test_pkce_missing_verifier(self) -> None: + self.login_as(self.user) + grant = ApiGrant.objects.create( + user=self.user, + application=self.application, + redirect_uri=self.application.get_default_redirect_uri(), + code_challenge="x" * 43, + code_challenge_method="S256", + ) + resp = self.client.post( + self.path, + { + "grant_type": "authorization_code", + "redirect_uri": self.application.get_default_redirect_uri(), + "code": grant.code, + "client_id": self.application.client_id, + "client_secret": self.client_secret, + }, + ) + assert resp.status_code == 400 + assert json.loads(resp.content) == {"error": "invalid_grant"} - assert data["access_token"] == token.token - assert data["refresh_token"] == token.refresh_token - assert isinstance(data["expires_in"], int) - assert data["token_type"] == "bearer" - assert data["user"]["id"] == str(token.user_id) + def test_pkce_length_and_charset(self) -> None: + self.login_as(self.user) + # too short + grant1 = ApiGrant.objects.create( + user=self.user, + application=self.application, + redirect_uri=self.application.get_default_redirect_uri(), + code_challenge="x" * 43, + code_challenge_method="S256", + ) + resp = self.client.post( + self.path, + { + "grant_type": "authorization_code", + "redirect_uri": self.application.get_default_redirect_uri(), + "code": grant1.code, + "code_verifier": "a" * 42, + "client_id": self.application.client_id, + "client_secret": self.client_secret, + }, + ) + assert resp.status_code == 400 + assert json.loads(resp.content) == {"error": "invalid_grant"} + + # too long + grant2 = ApiGrant.objects.create( + user=self.user, + application=self.application, + redirect_uri=self.application.get_default_redirect_uri(), + code_challenge="x" * 43, + code_challenge_method="S256", + ) + resp = self.client.post( + self.path, + { + "grant_type": "authorization_code", + "redirect_uri": self.application.get_default_redirect_uri(), + "code": grant2.code, + "code_verifier": "a" * 129, + "client_id": self.application.client_id, + "client_secret": self.client_secret, + }, + ) + assert resp.status_code == 400 + assert json.loads(resp.content) == {"error": "invalid_grant"} + + # invalid charset + grant3 = ApiGrant.objects.create( + user=self.user, + application=self.application, + redirect_uri=self.application.get_default_redirect_uri(), + code_challenge="x" * 43, + code_challenge_method="S256", + ) + resp = self.client.post( + self.path, + { + "grant_type": "authorization_code", + "redirect_uri": self.application.get_default_redirect_uri(), + "code": grant3.code, + "code_verifier": "invalid*chars", + "client_id": self.application.client_id, + "client_secret": self.client_secret, + }, + ) + assert resp.status_code == 400 + assert json.loads(resp.content) == {"error": "invalid_grant"} + + def test_pkce_s256_mismatch(self) -> None: + self.login_as(self.user) + grant = ApiGrant.objects.create( + user=self.user, + application=self.application, + redirect_uri=self.application.get_default_redirect_uri(), + code_challenge="x" * 43, + code_challenge_method="S256", + ) + resp = self.client.post( + self.path, + { + "grant_type": "authorization_code", + "redirect_uri": self.application.get_default_redirect_uri(), + "code": grant.code, + "code_verifier": "a" * 50, + "client_id": self.application.client_id, + "client_secret": self.client_secret, + }, + ) + assert resp.status_code == 400 + assert json.loads(resp.content) == {"error": "invalid_grant"} + + def test_pkce_plain_mode(self) -> None: + # plain should be denied by default + self.login_as(self.user) + grant = ApiGrant.objects.create( + user=self.user, + application=self.application, + redirect_uri=self.application.get_default_redirect_uri(), + code_challenge="a" * 50, + code_challenge_method="PLAIN", + ) + resp = self.client.post( + self.path, + { + "grant_type": "authorization_code", + "redirect_uri": self.application.get_default_redirect_uri(), + "code": grant.code, + "code_verifier": "a" * 50, + "client_id": self.application.client_id, + "client_secret": self.client_secret, + }, + ) + assert resp.status_code == 400 + assert json.loads(resp.content) == {"error": "invalid_grant"} + + # Temporarily allow plain and expect success + from sentry.web.frontend import oauth_token as oauth_token_module + + original = oauth_token_module.PKCE_ALLOW_PLAIN + oauth_token_module.PKCE_ALLOW_PLAIN = True + try: + grant2 = ApiGrant.objects.create( + user=self.user, + application=self.application, + redirect_uri=self.application.get_default_redirect_uri(), + code_challenge="b" * 60, + code_challenge_method="plain", + ) + resp = self.client.post( + self.path, + { + "grant_type": "authorization_code", + "redirect_uri": self.application.get_default_redirect_uri(), + "code": grant2.code, + "code_verifier": "b" * 60, + "client_id": self.application.client_id, + "client_secret": self.client_secret, + }, + ) + assert resp.status_code == 200 + finally: + oauth_token_module.PKCE_ALLOW_PLAIN = original def test_valid_params(self) -> None: self.login_as(self.user) From cf9375bee08d960f671a31d9e9f17be85cd025fd Mon Sep 17 00:00:00 2001 From: David Cramer Date: Sun, 14 Sep 2025 14:23:30 -0400 Subject: [PATCH 2/8] ref(oauth): Version-gated legacy behavior uses logger warnings, not metrics MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - v0: log warnings when redirect_uri fallback or missing PKCE verifier are used - v1: unchanged strict behavior RFCs: RFC 7636; RFC 6749 §4.1.3 Refs: getsentry/sentry#99002 --- src/sentry/web/frontend/oauth_authorize.py | 51 +++---- src/sentry/web/frontend/oauth_token.py | 134 ++++++++++++++---- tests/sentry/web/frontend/test_oauth_token.py | 2 + 3 files changed, 133 insertions(+), 54 deletions(-) diff --git a/src/sentry/web/frontend/oauth_authorize.py b/src/sentry/web/frontend/oauth_authorize.py index e416df40e2004d..ce53c9b74d6537 100644 --- a/src/sentry/web/frontend/oauth_authorize.py +++ b/src/sentry/web/frontend/oauth_authorize.py @@ -157,6 +157,32 @@ def get(self, request: HttpRequest, **kwargs) -> HttpResponseBase: err_response="client_id", ) + # Validate PKCE inputs (when provided). For v1+ applications, only S256 is allowed by default; + # for v0, allow "plain" to avoid breakage. + if code_challenge: + method = (code_challenge_method or PKCE_DEFAULT_METHOD).upper() + if method not in ("S256", "PLAIN"): + return self.error( + request=request, + client_id=client_id, + response_type=response_type, + redirect_uri=redirect_uri, + name="invalid_request", + err_response="code_challenge_method", + ) + if method == "PLAIN": + app_version = getattr(application, "version", 0) or 0 + if app_version >= 1 and not PKCE_ALLOW_PLAIN: + return self.error( + request=request, + client_id=client_id, + response_type=response_type, + redirect_uri=redirect_uri, + name="invalid_request", + err_response="code_challenge_method", + ) + code_challenge_method = method + scopes_s = request.GET.get("scope") if scopes_s: scopes = scopes_s.split(" ") @@ -188,30 +214,7 @@ def get(self, request: HttpRequest, **kwargs) -> HttpResponseBase: state=state, ) - # Validate PKCE inputs (when provided). We only allow S256 by default. - if code_challenge: - # Default to S256 if method omitted - method = (code_challenge_method or PKCE_DEFAULT_METHOD).upper() - if method not in ("S256", "PLAIN"): - return self.error( - request=request, - client_id=client_id, - response_type=response_type, - redirect_uri=redirect_uri, - name="invalid_request", - err_response="code_challenge_method", - ) - if method == "PLAIN" and not PKCE_ALLOW_PLAIN: - return self.error( - request=request, - client_id=client_id, - response_type=response_type, - redirect_uri=redirect_uri, - name="invalid_request", - err_response="code_challenge_method", - ) - # Normalize casing for storage - code_challenge_method = method + # Defer PKCE validation until after we have `application` to check version. payload = { "rt": response_type, diff --git a/src/sentry/web/frontend/oauth_token.py b/src/sentry/web/frontend/oauth_token.py index a5ad2cb25038d4..dea805332100b8 100644 --- a/src/sentry/web/frontend/oauth_token.py +++ b/src/sentry/web/frontend/oauth_token.py @@ -145,37 +145,26 @@ def get_access_tokens(self, request: Request, application: ApiApplication) -> di if grant.is_expired(): return {"error": "invalid_grant", "reason": "grant expired"} - # Enforce redirect_uri binding: when the grant was issued with a redirect_uri, - # the token request MUST include the identical redirect_uri (RFC 6749 §4.1.3). + # Enforce redirect_uri binding with application-version-aware behavior + app_version = getattr(grant.application, "version", 0) or 0 redirect_uri = request.POST.get("redirect_uri") - if grant.redirect_uri: - if not redirect_uri or grant.redirect_uri != redirect_uri: - return {"error": "invalid_grant", "reason": "invalid redirect URI"} - - # PKCE verification (RFC 7636): if the grant has a stored code_challenge, - # require a valid code_verifier on the token request. - if grant.code_challenge: - code_verifier = request.POST.get("code_verifier") - if not code_verifier: - return {"error": "invalid_grant", "reason": "missing code_verifier"} - # Enforce length and charset - if len(code_verifier) < 43 or len(code_verifier) > 128: - return {"error": "invalid_grant", "reason": "invalid code_verifier length"} - if not _PKCE_VERIFIER_RE.match(code_verifier): - return {"error": "invalid_grant", "reason": "invalid code_verifier charset"} - - method = (grant.code_challenge_method or "S256").upper() - if method == "S256": - digest = hashlib.sha256(code_verifier.encode("ascii")).digest() - computed = base64.urlsafe_b64encode(digest).decode("ascii").rstrip("=") - if computed != grant.code_challenge: - return {"error": "invalid_grant", "reason": "pkce verification failed"} - elif method == "PLAIN": - if not PKCE_ALLOW_PLAIN or code_verifier != grant.code_challenge: - return {"error": "invalid_grant", "reason": "pkce verification failed"} - else: - # Unknown method on stored grant; reject - return {"error": "invalid_grant", "reason": "unsupported pkce method"} + redirect_check = self._check_redirect_binding( + application=application, + grant=grant, + request_redirect_uri=redirect_uri, + app_version=app_version, + ) + if redirect_check is not None: + return redirect_check + + # PKCE verification with application-version-aware behavior + pkce_error = self._verify_pkce( + grant=grant, + code_verifier=request.POST.get("code_verifier"), + app_version=app_version, + ) + if pkce_error is not None: + return pkce_error try: token_data = {"token": ApiToken.from_grant(grant=grant)} @@ -194,6 +183,91 @@ def get_access_tokens(self, request: Request, application: ApiApplication) -> di return token_data + @staticmethod + def _compute_s256(code_verifier: str) -> str: + digest = hashlib.sha256(code_verifier.encode("ascii")).digest() + return base64.urlsafe_b64encode(digest).decode("ascii").rstrip("=") + + def _check_redirect_binding( + self, + *, + application: ApiApplication, + grant: ApiGrant, + request_redirect_uri: str | None, + app_version: int, + ) -> dict | None: + """Validate redirect_uri binding and log legacy fallback. + + Returns an error dict on failure, otherwise None. The resolved redirect_uri + is not used downstream, so there is no return of the value here. + """ + if app_version >= 1: + if grant.redirect_uri and ( + not request_redirect_uri or grant.redirect_uri != request_redirect_uri + ): + return {"error": "invalid_grant", "reason": "invalid redirect URI"} + return None + + # v0 legacy behavior + if not request_redirect_uri: + logger.warning( + "oauth.token-legacy-redirect-fallback", + extra={ + "application_id": grant.application_id, + "client_id": application.client_id, + "grant_id": grant.id, + }, + ) + # Allow fallback; nothing else to check + return None + + if grant.redirect_uri != request_redirect_uri: + return {"error": "invalid_grant", "reason": "invalid redirect URI"} + return None + + def _verify_pkce( + self, *, grant: ApiGrant, code_verifier: str | None, app_version: int + ) -> dict | None: + if not grant.code_challenge: + return None + + # For v0: missing verifier is allowed (log), provided one must validate + if app_version < 1 and not code_verifier: + logger.warning( + "oauth.token-legacy-pkce-missing-verifier", + extra={ + "application_id": grant.application_id, + "client_id": grant.application.client_id, + "grant_id": grant.id, + }, + ) + return None + + # For v1 or when provided in v0, validate per RFC 7636 + if not code_verifier: + return {"error": "invalid_grant", "reason": "missing code_verifier"} + if len(code_verifier) < 43 or len(code_verifier) > 128: + return {"error": "invalid_grant", "reason": "invalid code_verifier length"} + if not _PKCE_VERIFIER_RE.match(code_verifier): + return {"error": "invalid_grant", "reason": "invalid code_verifier charset"} + + method = (grant.code_challenge_method or "S256").upper() + if method == "S256": + computed = self._compute_s256(code_verifier) + if computed != grant.code_challenge: + return {"error": "invalid_grant", "reason": "pkce verification failed"} + return None + + if method == "PLAIN": + # v1 respects PKCE_ALLOW_PLAIN; v0 allows plain regardless + if app_version >= 1 and not PKCE_ALLOW_PLAIN: + return {"error": "invalid_grant", "reason": "pkce verification failed"} + if code_verifier != grant.code_challenge: + return {"error": "invalid_grant", "reason": "pkce verification failed"} + return None + + return {"error": "invalid_grant", "reason": "unsupported pkce method"} + def get_refresh_token(self, request: Request, application: ApiApplication) -> dict: refresh_token_code = request.POST.get("refresh_token") scope = request.POST.get("scope") diff --git a/tests/sentry/web/frontend/test_oauth_token.py b/tests/sentry/web/frontend/test_oauth_token.py index 0771f1a9b0dc09..8c936cdd6e8ec8 100644 --- a/tests/sentry/web/frontend/test_oauth_token.py +++ b/tests/sentry/web/frontend/test_oauth_token.py @@ -54,6 +54,8 @@ def setUp(self) -> None: self.application = ApiApplication.objects.create( owner=self.user, redirect_uris="https://example.com" ) + # Use application version 1 for strict OAuth 2.1 behaviors in these tests + self.application.update(version=1) self.client_secret = self.application.client_secret self.grant = ApiGrant.objects.create( user=self.user, application=self.application, redirect_uri="https://example.com" From 2188991c5784192ec4f6f43918daf490207065e5 Mon Sep 17 00:00:00 2001 From: David Cramer Date: Sun, 14 Sep 2025 14:28:36 -0400 Subject: [PATCH 3/8] ref(oauth): Remove PKCE_ALLOW_PLAIN; gate plain via ApiApplication.version MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - v1: reject code_challenge_method=plain at authorize and token - v0: allow plain for compatibility - Update tests to switch app version for plain success path RFCs: RFC 7636 (PKCE); RFC 6749 §4.1.3 Refs: getsentry/sentry#99002 --- src/sentry/web/frontend/oauth_authorize.py | 7 ++- src/sentry/web/frontend/oauth_token.py | 6 +-- tests/sentry/web/frontend/test_oauth_token.py | 50 ++++++++----------- 3 files changed, 27 insertions(+), 36 deletions(-) diff --git a/src/sentry/web/frontend/oauth_authorize.py b/src/sentry/web/frontend/oauth_authorize.py index ce53c9b74d6537..e12370ed20b179 100644 --- a/src/sentry/web/frontend/oauth_authorize.py +++ b/src/sentry/web/frontend/oauth_authorize.py @@ -22,8 +22,7 @@ logger = logging.getLogger("sentry.api.oauth_authorize") -# PKCE behavior: Prefer S256; optionally allow "plain" when toggled on -PKCE_ALLOW_PLAIN = False +# PKCE behavior: Prefer S256; v1 applications require S256; v0 allow plain for compatibility PKCE_DEFAULT_METHOD = "S256" @@ -157,7 +156,7 @@ def get(self, request: HttpRequest, **kwargs) -> HttpResponseBase: err_response="client_id", ) - # Validate PKCE inputs (when provided). For v1+ applications, only S256 is allowed by default; + # Validate PKCE inputs (when provided). For v1+ applications, only S256 is allowed; # for v0, allow "plain" to avoid breakage. if code_challenge: method = (code_challenge_method or PKCE_DEFAULT_METHOD).upper() @@ -172,7 +171,7 @@ def get(self, request: HttpRequest, **kwargs) -> HttpResponseBase: ) if method == "PLAIN": app_version = getattr(application, "version", 0) or 0 - if app_version >= 1 and not PKCE_ALLOW_PLAIN: + if app_version >= 1: return self.error( request=request, client_id=client_id, diff --git a/src/sentry/web/frontend/oauth_token.py b/src/sentry/web/frontend/oauth_token.py index dea805332100b8..7eb2760d42f7dd 100644 --- a/src/sentry/web/frontend/oauth_token.py +++ b/src/sentry/web/frontend/oauth_token.py @@ -27,8 +27,6 @@ logger = logging.getLogger("sentry.api.oauth_token") -# PKCE behavior: Prefer S256; allow "plain" only when toggled on -PKCE_ALLOW_PLAIN = False _PKCE_VERIFIER_RE = re.compile(r"^[A-Za-z0-9\-\._~]{43,128}$") @@ -259,8 +257,8 @@ def _verify_pkce( return None if method == "PLAIN": - # v1 respects PKCE_ALLOW_PLAIN; v0 allows plain regardless - if app_version >= 1 and not PKCE_ALLOW_PLAIN: + # v1 forbids plain; v0 allows plain + if app_version >= 1: return {"error": "invalid_grant", "reason": "pkce verification failed"} if code_verifier != grant.code_challenge: return {"error": "invalid_grant", "reason": "pkce verification failed"} diff --git a/tests/sentry/web/frontend/test_oauth_token.py b/tests/sentry/web/frontend/test_oauth_token.py index 8c936cdd6e8ec8..8ed5c4e19f3f8c 100644 --- a/tests/sentry/web/frontend/test_oauth_token.py +++ b/tests/sentry/web/frontend/test_oauth_token.py @@ -445,7 +445,7 @@ def test_pkce_s256_mismatch(self) -> None: assert json.loads(resp.content) == {"error": "invalid_grant"} def test_pkce_plain_mode(self) -> None: - # plain should be denied by default + # plain should be denied for v1 applications, allowed for v0 self.login_as(self.user) grant = ApiGrant.objects.create( user=self.user, @@ -468,33 +468,27 @@ def test_pkce_plain_mode(self) -> None: assert resp.status_code == 400 assert json.loads(resp.content) == {"error": "invalid_grant"} - # Temporarily allow plain and expect success - from sentry.web.frontend import oauth_token as oauth_token_module - - original = oauth_token_module.PKCE_ALLOW_PLAIN - oauth_token_module.PKCE_ALLOW_PLAIN = True - try: - grant2 = ApiGrant.objects.create( - user=self.user, - application=self.application, - redirect_uri=self.application.get_default_redirect_uri(), - code_challenge="b" * 60, - code_challenge_method="plain", - ) - resp = self.client.post( - self.path, - { - "grant_type": "authorization_code", - "redirect_uri": self.application.get_default_redirect_uri(), - "code": grant2.code, - "code_verifier": "b" * 60, - "client_id": self.application.client_id, - "client_secret": self.client_secret, - }, - ) - assert resp.status_code == 200 - finally: - oauth_token_module.PKCE_ALLOW_PLAIN = original + # Switch to v0 behavior and expect success + self.application.update(version=0) + grant2 = ApiGrant.objects.create( + user=self.user, + application=self.application, + redirect_uri=self.application.get_default_redirect_uri(), + code_challenge="b" * 60, + code_challenge_method="plain", + ) + resp = self.client.post( + self.path, + { + "grant_type": "authorization_code", + "redirect_uri": self.application.get_default_redirect_uri(), + "code": grant2.code, + "code_verifier": "b" * 60, + "client_id": self.application.client_id, + "client_secret": self.client_secret, + }, + ) + assert resp.status_code == 200 def test_valid_params(self) -> None: self.login_as(self.user) From 123923d7cae4d594f1c0b85c33e8177cf90e1af4 Mon Sep 17 00:00:00 2001 From: David Cramer Date: Sun, 14 Sep 2025 14:33:20 -0400 Subject: [PATCH 4/8] update lockfile --- migrations_lockfile.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/migrations_lockfile.txt b/migrations_lockfile.txt index b54fe679db0928..2c48b1ec90b10e 100644 --- a/migrations_lockfile.txt +++ b/migrations_lockfile.txt @@ -29,7 +29,7 @@ releases: 0001_release_models replays: 0006_add_bulk_delete_job -sentry: 0980_integrations_json_field +sentry: 0981_apigrant_pkce_fields social_auth: 0003_social_auth_json_field From a2d3d5df6d97f064d7164abec17c7455f6ebc5c7 Mon Sep 17 00:00:00 2001 From: David Cramer Date: Tue, 23 Sep 2025 08:50:34 -0700 Subject: [PATCH 5/8] fix(oauth): normalize PKCE defaults and validation --- src/sentry/web/frontend/oauth_authorize.py | 37 +++++++--- src/sentry/web/frontend/oauth_token.py | 26 +++++-- .../web/frontend/test_oauth_authorize.py | 61 ++++++++++++++++ tests/sentry/web/frontend/test_oauth_token.py | 72 +++++++++++++++++++ 4 files changed, 184 insertions(+), 12 deletions(-) diff --git a/src/sentry/web/frontend/oauth_authorize.py b/src/sentry/web/frontend/oauth_authorize.py index e12370ed20b179..2de53c0dd06655 100644 --- a/src/sentry/web/frontend/oauth_authorize.py +++ b/src/sentry/web/frontend/oauth_authorize.py @@ -1,6 +1,7 @@ from __future__ import annotations import logging +import re from typing import Any, Literal from urllib.parse import parse_qsl, urlencode, urlparse, urlunparse @@ -22,8 +23,11 @@ logger = logging.getLogger("sentry.api.oauth_authorize") -# PKCE behavior: Prefer S256; v1 applications require S256; v0 allow plain for compatibility -PKCE_DEFAULT_METHOD = "S256" +# PKCE validation helpers +PKCE_METHOD_S256 = "S256" +PKCE_METHOD_PLAIN = "plain" +PKCE_DEFAULT_METHOD = PKCE_METHOD_PLAIN +PKCE_CODE_PATTERN = re.compile(r"^[A-Za-z0-9\-\._~]{43,128}$") class OAuthAuthorizeView(AuthLoginView): @@ -157,10 +161,18 @@ def get(self, request: HttpRequest, **kwargs) -> HttpResponseBase: ) # Validate PKCE inputs (when provided). For v1+ applications, only S256 is allowed; - # for v0, allow "plain" to avoid breakage. + # for v0, allow "plain" to avoid breakage. Spec default for missing method is "plain" (RFC 7636 §4.3). if code_challenge: - method = (code_challenge_method or PKCE_DEFAULT_METHOD).upper() - if method not in ("S256", "PLAIN"): + method_raw = code_challenge_method or PKCE_DEFAULT_METHOD + method_key = method_raw.upper() + if method_key == "S256": + method = PKCE_METHOD_S256 + elif method_key == "PLAIN": + method = PKCE_METHOD_PLAIN + else: + method = None + + if method is None: return self.error( request=request, client_id=client_id, @@ -169,8 +181,17 @@ def get(self, request: HttpRequest, **kwargs) -> HttpResponseBase: name="invalid_request", err_response="code_challenge_method", ) - if method == "PLAIN": - app_version = getattr(application, "version", 0) or 0 + if not PKCE_CODE_PATTERN.match(code_challenge): + return self.error( + request=request, + client_id=client_id, + response_type=response_type, + redirect_uri=redirect_uri, + name="invalid_request", + err_response="code_challenge", + ) + if method == PKCE_METHOD_PLAIN: + app_version = application.version if app_version >= 1: return self.error( request=request, @@ -389,7 +410,7 @@ def approve( state, ) -> HttpResponseBase: # Pull PKCE data (if any) from the session payload prepared during GET - sess_payload = request.session.get("oa2", {}) if hasattr(request, "session") else {} + sess_payload = request.session.get("oa2", {}) sess_code_challenge = sess_payload.get("cc") or None sess_code_challenge_method = sess_payload.get("ccm") or None # Some applications require org level access, so user who approves only gives diff --git a/src/sentry/web/frontend/oauth_token.py b/src/sentry/web/frontend/oauth_token.py index 7eb2760d42f7dd..52082bc70cbd4c 100644 --- a/src/sentry/web/frontend/oauth_token.py +++ b/src/sentry/web/frontend/oauth_token.py @@ -23,6 +23,11 @@ from sentry.utils import json, metrics from sentry.utils.locking import UnableToAcquireLock from sentry.web.frontend.base import control_silo_view +from sentry.web.frontend.oauth_authorize import ( + PKCE_DEFAULT_METHOD, + PKCE_METHOD_PLAIN, + PKCE_METHOD_S256, +) from sentry.web.frontend.openidtoken import OpenIDToken logger = logging.getLogger("sentry.api.oauth_token") @@ -144,7 +149,7 @@ def get_access_tokens(self, request: Request, application: ApiApplication) -> di return {"error": "invalid_grant", "reason": "grant expired"} # Enforce redirect_uri binding with application-version-aware behavior - app_version = getattr(grant.application, "version", 0) or 0 + app_version = grant.application.version redirect_uri = request.POST.get("redirect_uri") redirect_check = self._check_redirect_binding( application=application, @@ -249,14 +254,27 @@ def _verify_pkce( if not _PKCE_VERIFIER_RE.match(code_verifier): return {"error": "invalid_grant", "reason": "invalid code_verifier charset"} - method = (grant.code_challenge_method or "S256").upper() - if method == "S256": + method_raw = grant.code_challenge_method or "" + method_key = method_raw.upper() + if not method_key: + method = PKCE_DEFAULT_METHOD + elif method_key == "S256": + method = PKCE_METHOD_S256 + elif method_key == "PLAIN": + method = PKCE_METHOD_PLAIN + else: + return {"error": "invalid_grant", "reason": "unsupported pkce method"} + + if method_raw != method: + grant.update(code_challenge_method=method) + + if method == PKCE_METHOD_S256: computed = self._compute_s256(code_verifier) if computed != grant.code_challenge: return {"error": "invalid_grant", "reason": "pkce verification failed"} return None - if method == "PLAIN": + if method == PKCE_METHOD_PLAIN: # v1 forbids plain; v0 allows plain if app_version >= 1: return {"error": "invalid_grant", "reason": "pkce verification failed"} diff --git a/tests/sentry/web/frontend/test_oauth_authorize.py b/tests/sentry/web/frontend/test_oauth_authorize.py index 51acdbd657f41e..d0f051231fde3b 100644 --- a/tests/sentry/web/frontend/test_oauth_authorize.py +++ b/tests/sentry/web/frontend/test_oauth_authorize.py @@ -7,6 +7,7 @@ from sentry.models.apitoken import ApiToken from sentry.testutils.cases import TestCase from sentry.testutils.silo import control_silo_test +from sentry.web.frontend.oauth_authorize import PKCE_METHOD_PLAIN, PKCE_METHOD_S256 @control_silo_test @@ -120,6 +121,66 @@ def test_persists_pkce_fields_when_provided(self) -> None: ) assert grant.code_challenge_method == "S256" + def test_pkce_defaults_to_plain_for_legacy_apps(self) -> None: + self.application.update(version=0) + self.login_as(self.user) + + challenge = "a" * 64 + resp = self.client.get( + f"{self.path}?response_type=code&client_id={self.application.client_id}" + f"&code_challenge={challenge}" + ) + + assert resp.status_code == 200 + + resp = self.client.post(self.path, {"op": "approve"}) + assert resp.status_code == 302 + + grant = ApiGrant.objects.get(user=self.user) + assert grant.code_challenge == challenge + assert grant.code_challenge_method == PKCE_METHOD_PLAIN + + def test_pkce_missing_method_rejected_for_v1(self) -> None: + self.application.update(version=1) + self.login_as(self.user) + + resp = self.client.get( + f"{self.path}?response_type=code&client_id={self.application.client_id}" + "&code_challenge=" + ("a" * 64) + ) + + assert resp.status_code == 400 + self.assertTemplateUsed("sentry/oauth-error.html") + assert ( + resp.context["error"] == "Missing or invalid code_challenge_method parameter." + ) + + def test_pkce_rejects_invalid_challenge_length(self) -> None: + self.login_as(self.user) + + challenge = "a" * 200 + resp = self.client.get( + f"{self.path}?response_type=code&client_id={self.application.client_id}" + f"&code_challenge={challenge}&code_challenge_method={PKCE_METHOD_S256}" + ) + + assert resp.status_code == 400 + self.assertTemplateUsed("sentry/oauth-error.html") + assert resp.context["error"] == "Missing or invalid code_challenge parameter." + + def test_pkce_rejects_invalid_challenge_charset(self) -> None: + self.login_as(self.user) + + challenge = "invalid*chars" + resp = self.client.get( + f"{self.path}?response_type=code&client_id={self.application.client_id}" + f"&code_challenge={challenge}&code_challenge_method={PKCE_METHOD_S256}" + ) + + assert resp.status_code == 400 + self.assertTemplateUsed("sentry/oauth-error.html") + assert resp.context["error"] == "Missing or invalid code_challenge parameter." + def test_minimal_params_deny_flow(self) -> None: self.login_as(self.user) diff --git a/tests/sentry/web/frontend/test_oauth_token.py b/tests/sentry/web/frontend/test_oauth_token.py index 8ed5c4e19f3f8c..4837016e42de19 100644 --- a/tests/sentry/web/frontend/test_oauth_token.py +++ b/tests/sentry/web/frontend/test_oauth_token.py @@ -9,6 +9,7 @@ from sentry.testutils.cases import TestCase from sentry.testutils.silo import control_silo_test from sentry.utils import json +from sentry.web.frontend.oauth_authorize import PKCE_METHOD_PLAIN @control_silo_test @@ -489,6 +490,77 @@ def test_pkce_plain_mode(self) -> None: }, ) assert resp.status_code == 200 + data = json.loads(resp.content) + token = ApiToken.objects.get(token=data["access_token"]) + assert token.application == self.application + grant2.refresh_from_db() + assert grant2.code_challenge_method == PKCE_METHOD_PLAIN + + def test_pkce_missing_method_defaults_to_plain(self) -> None: + self.application.update(version=0) + self.login_as(self.user) + grant = ApiGrant.objects.create( + user=self.user, + application=self.application, + redirect_uri=self.application.get_default_redirect_uri(), + code_challenge="c" * 60, + code_challenge_method=None, + ) + + resp = self.client.post( + self.path, + { + "grant_type": "authorization_code", + "redirect_uri": self.application.get_default_redirect_uri(), + "code": grant.code, + "code_verifier": "c" * 60, + "client_id": self.application.client_id, + "client_secret": self.client_secret, + }, + ) + + assert resp.status_code == 200 + data = json.loads(resp.content) + token = ApiToken.objects.get(token=data["access_token"]) + assert token.application == self.application + grant.refresh_from_db() + assert grant.code_challenge_method == PKCE_METHOD_PLAIN + + def test_pkce_missing_method_treated_as_plain_failure_when_hashed(self) -> None: + self.application.update(version=0) + self.login_as(self.user) + + import base64 + import hashlib + + verifier = "v" * 50 + digest = hashlib.sha256(verifier.encode("ascii")).digest() + challenge = base64.urlsafe_b64encode(digest).decode("ascii").rstrip("=") + + grant = ApiGrant.objects.create( + user=self.user, + application=self.application, + redirect_uri=self.application.get_default_redirect_uri(), + code_challenge=challenge, + code_challenge_method=None, + ) + + resp = self.client.post( + self.path, + { + "grant_type": "authorization_code", + "redirect_uri": self.application.get_default_redirect_uri(), + "code": grant.code, + "code_verifier": verifier, + "client_id": self.application.client_id, + "client_secret": self.client_secret, + }, + ) + + assert resp.status_code == 400 + assert json.loads(resp.content) == {"error": "invalid_grant"} + grant.refresh_from_db() + assert grant.code_challenge_method == PKCE_METHOD_PLAIN def test_valid_params(self) -> None: self.login_as(self.user) From e957c56e50bfe4e94a9be111d4123c178aab96ca Mon Sep 17 00:00:00 2001 From: David Cramer Date: Tue, 23 Sep 2025 09:51:58 -0700 Subject: [PATCH 6/8] Pull utils into shared components --- src/sentry/utils/oauth.py | 28 +++++++++++++++++++ src/sentry/web/frontend/oauth_authorize.py | 19 ++----------- src/sentry/web/frontend/oauth_token.py | 22 ++++++--------- .../web/frontend/test_oauth_authorize.py | 22 ++++++++++++++- tests/sentry/web/frontend/test_oauth_token.py | 2 +- 5 files changed, 62 insertions(+), 31 deletions(-) create mode 100644 src/sentry/utils/oauth.py diff --git a/src/sentry/utils/oauth.py b/src/sentry/utils/oauth.py new file mode 100644 index 00000000000000..0072490f7166ca --- /dev/null +++ b/src/sentry/utils/oauth.py @@ -0,0 +1,28 @@ +from __future__ import annotations + +import re + +# PKCE helpers shared between OAuth authorize/token views + +PKCE_METHOD_S256 = "S256" +PKCE_METHOD_PLAIN = "plain" +PKCE_DEFAULT_METHOD = PKCE_METHOD_PLAIN +_PKCE_CODE_PATTERN = re.compile(r"^[A-Za-z0-9\-\._~]{43,128}$") + + +def validate_code_challenge(challenge: str | None) -> bool: + if not challenge: + return False + return bool(_PKCE_CODE_PATTERN.match(challenge)) + + +def normalize_pkce_method(method_raw: str | None) -> str | None: + if not method_raw: + return PKCE_DEFAULT_METHOD + + method_key = method_raw.upper() + if method_key == "S256": + return PKCE_METHOD_S256 + if method_key == "PLAIN": + return PKCE_METHOD_PLAIN + return None diff --git a/src/sentry/web/frontend/oauth_authorize.py b/src/sentry/web/frontend/oauth_authorize.py index 2de53c0dd06655..48efbb9b524ec1 100644 --- a/src/sentry/web/frontend/oauth_authorize.py +++ b/src/sentry/web/frontend/oauth_authorize.py @@ -1,7 +1,6 @@ from __future__ import annotations import logging -import re from typing import Any, Literal from urllib.parse import parse_qsl, urlencode, urlparse, urlunparse @@ -19,16 +18,11 @@ from sentry.users.models.user import User from sentry.users.services.user.service import user_service from sentry.utils import metrics +from sentry.utils.oauth import PKCE_METHOD_PLAIN, normalize_pkce_method, validate_code_challenge from sentry.web.frontend.auth_login import AuthLoginView logger = logging.getLogger("sentry.api.oauth_authorize") -# PKCE validation helpers -PKCE_METHOD_S256 = "S256" -PKCE_METHOD_PLAIN = "plain" -PKCE_DEFAULT_METHOD = PKCE_METHOD_PLAIN -PKCE_CODE_PATTERN = re.compile(r"^[A-Za-z0-9\-\._~]{43,128}$") - class OAuthAuthorizeView(AuthLoginView): auth_required = False @@ -163,14 +157,7 @@ def get(self, request: HttpRequest, **kwargs) -> HttpResponseBase: # Validate PKCE inputs (when provided). For v1+ applications, only S256 is allowed; # for v0, allow "plain" to avoid breakage. Spec default for missing method is "plain" (RFC 7636 §4.3). if code_challenge: - method_raw = code_challenge_method or PKCE_DEFAULT_METHOD - method_key = method_raw.upper() - if method_key == "S256": - method = PKCE_METHOD_S256 - elif method_key == "PLAIN": - method = PKCE_METHOD_PLAIN - else: - method = None + method = normalize_pkce_method(code_challenge_method) if method is None: return self.error( @@ -181,7 +168,7 @@ def get(self, request: HttpRequest, **kwargs) -> HttpResponseBase: name="invalid_request", err_response="code_challenge_method", ) - if not PKCE_CODE_PATTERN.match(code_challenge): + if not validate_code_challenge(code_challenge): return self.error( request=request, client_id=client_id, diff --git a/src/sentry/web/frontend/oauth_token.py b/src/sentry/web/frontend/oauth_token.py index 52082bc70cbd4c..8ab0b0db9724b6 100644 --- a/src/sentry/web/frontend/oauth_token.py +++ b/src/sentry/web/frontend/oauth_token.py @@ -22,12 +22,13 @@ from sentry.sentry_apps.token_exchange.util import GrantTypes from sentry.utils import json, metrics from sentry.utils.locking import UnableToAcquireLock -from sentry.web.frontend.base import control_silo_view -from sentry.web.frontend.oauth_authorize import ( - PKCE_DEFAULT_METHOD, +from sentry.utils.oauth import ( PKCE_METHOD_PLAIN, PKCE_METHOD_S256, + normalize_pkce_method, + validate_code_challenge, ) +from sentry.web.frontend.base import control_silo_view from sentry.web.frontend.openidtoken import OpenIDToken logger = logging.getLogger("sentry.api.oauth_token") @@ -233,6 +234,8 @@ def _verify_pkce( ) -> dict | None: if not grant.code_challenge: return None + if not validate_code_challenge(grant.code_challenge): + return {"error": "invalid_grant", "reason": "invalid code_challenge"} # For v0: missing verifier is allowed (log), provided one must validate if app_version < 1 and not code_verifier: @@ -254,18 +257,11 @@ def _verify_pkce( if not _PKCE_VERIFIER_RE.match(code_verifier): return {"error": "invalid_grant", "reason": "invalid code_verifier charset"} - method_raw = grant.code_challenge_method or "" - method_key = method_raw.upper() - if not method_key: - method = PKCE_DEFAULT_METHOD - elif method_key == "S256": - method = PKCE_METHOD_S256 - elif method_key == "PLAIN": - method = PKCE_METHOD_PLAIN - else: + method = normalize_pkce_method(grant.code_challenge_method) + if method is None: return {"error": "invalid_grant", "reason": "unsupported pkce method"} - if method_raw != method: + if grant.code_challenge_method != method: grant.update(code_challenge_method=method) if method == PKCE_METHOD_S256: diff --git a/tests/sentry/web/frontend/test_oauth_authorize.py b/tests/sentry/web/frontend/test_oauth_authorize.py index d0f051231fde3b..e60adafc516858 100644 --- a/tests/sentry/web/frontend/test_oauth_authorize.py +++ b/tests/sentry/web/frontend/test_oauth_authorize.py @@ -7,7 +7,12 @@ from sentry.models.apitoken import ApiToken from sentry.testutils.cases import TestCase from sentry.testutils.silo import control_silo_test -from sentry.web.frontend.oauth_authorize import PKCE_METHOD_PLAIN, PKCE_METHOD_S256 +from sentry.utils.oauth import ( + PKCE_METHOD_PLAIN, + PKCE_METHOD_S256, + normalize_pkce_method, + validate_code_challenge, +) @control_silo_test @@ -181,6 +186,21 @@ def test_pkce_rejects_invalid_challenge_charset(self) -> None: self.assertTemplateUsed("sentry/oauth-error.html") assert resp.context["error"] == "Missing or invalid code_challenge parameter." + def test_validate_code_challenge_helper(self) -> None: + assert validate_code_challenge("a" * 43) + assert validate_code_challenge("A._-~0" * 7) + assert not validate_code_challenge("short") + assert not validate_code_challenge("invalid*chars") + assert not validate_code_challenge("a" * 129) + + def test_normalize_pkce_method_helper(self) -> None: + assert normalize_pkce_method(None) == PKCE_METHOD_PLAIN + assert normalize_pkce_method("plain") == PKCE_METHOD_PLAIN + assert normalize_pkce_method("PLAIN") == PKCE_METHOD_PLAIN + assert normalize_pkce_method("S256") == PKCE_METHOD_S256 + assert normalize_pkce_method("s256") == PKCE_METHOD_S256 + assert normalize_pkce_method("unknown") is None + def test_minimal_params_deny_flow(self) -> None: self.login_as(self.user) diff --git a/tests/sentry/web/frontend/test_oauth_token.py b/tests/sentry/web/frontend/test_oauth_token.py index 4837016e42de19..12f788bc9fac21 100644 --- a/tests/sentry/web/frontend/test_oauth_token.py +++ b/tests/sentry/web/frontend/test_oauth_token.py @@ -9,7 +9,7 @@ from sentry.testutils.cases import TestCase from sentry.testutils.silo import control_silo_test from sentry.utils import json -from sentry.web.frontend.oauth_authorize import PKCE_METHOD_PLAIN +from sentry.utils.oauth import PKCE_METHOD_PLAIN @control_silo_test From 49c6bfe341fea2b2600d06c26f5a8fb2be352183 Mon Sep 17 00:00:00 2001 From: David Cramer Date: Tue, 23 Sep 2025 09:57:09 -0700 Subject: [PATCH 7/8] Dont allow pkce bypass --- src/sentry/web/frontend/oauth_token.py | 14 +---------- tests/sentry/web/frontend/test_oauth_token.py | 25 +++++++++++++++++++ 2 files changed, 26 insertions(+), 13 deletions(-) diff --git a/src/sentry/web/frontend/oauth_token.py b/src/sentry/web/frontend/oauth_token.py index 8ab0b0db9724b6..3e0b3a277a0978 100644 --- a/src/sentry/web/frontend/oauth_token.py +++ b/src/sentry/web/frontend/oauth_token.py @@ -237,19 +237,7 @@ def _verify_pkce( if not validate_code_challenge(grant.code_challenge): return {"error": "invalid_grant", "reason": "invalid code_challenge"} - # For v0: missing verifier is allowed (log), provided one must validate - if app_version < 1 and not code_verifier: - logger.warning( - "oauth.token-legacy-pkce-missing-verifier", - extra={ - "application_id": grant.application_id, - "client_id": grant.application.client_id, - "grant_id": grant.id, - }, - ) - return None - - # For v1 or when provided in v0, validate per RFC 7636 + # For v1, and for v0 once a challenge is stored, PKCE must complete. if not code_verifier: return {"error": "invalid_grant", "reason": "missing code_verifier"} if len(code_verifier) < 43 or len(code_verifier) > 128: diff --git a/tests/sentry/web/frontend/test_oauth_token.py b/tests/sentry/web/frontend/test_oauth_token.py index 12f788bc9fac21..087d8d58a3b7ec 100644 --- a/tests/sentry/web/frontend/test_oauth_token.py +++ b/tests/sentry/web/frontend/test_oauth_token.py @@ -354,6 +354,31 @@ def test_pkce_missing_verifier(self) -> None: assert resp.status_code == 400 assert json.loads(resp.content) == {"error": "invalid_grant"} + def test_pkce_missing_verifier_rejected_for_legacy_app(self) -> None: + self.application.update(version=0) + self.login_as(self.user) + grant = ApiGrant.objects.create( + user=self.user, + application=self.application, + redirect_uri=self.application.get_default_redirect_uri(), + code_challenge="x" * 50, + code_challenge_method="S256", + ) + + resp = self.client.post( + self.path, + { + "grant_type": "authorization_code", + "redirect_uri": self.application.get_default_redirect_uri(), + "code": grant.code, + "client_id": self.application.client_id, + "client_secret": self.client_secret, + }, + ) + + assert resp.status_code == 400 + assert json.loads(resp.content) == {"error": "invalid_grant"} + def test_pkce_length_and_charset(self) -> None: self.login_as(self.user) # too short From 718cdc446d9ec38c324aaedd654c85e84c7fe881 Mon Sep 17 00:00:00 2001 From: David Cramer Date: Tue, 23 Sep 2025 10:15:35 -0700 Subject: [PATCH 8/8] Fix missing code_challenge with method --- src/sentry/web/frontend/oauth_authorize.py | 10 ++++++++++ tests/sentry/web/frontend/test_oauth_authorize.py | 11 +++++++++++ 2 files changed, 21 insertions(+) diff --git a/src/sentry/web/frontend/oauth_authorize.py b/src/sentry/web/frontend/oauth_authorize.py index 48efbb9b524ec1..b5dddc6c99be88 100644 --- a/src/sentry/web/frontend/oauth_authorize.py +++ b/src/sentry/web/frontend/oauth_authorize.py @@ -156,6 +156,16 @@ def get(self, request: HttpRequest, **kwargs) -> HttpResponseBase: # Validate PKCE inputs (when provided). For v1+ applications, only S256 is allowed; # for v0, allow "plain" to avoid breakage. Spec default for missing method is "plain" (RFC 7636 §4.3). + if code_challenge_method and not code_challenge: + return self.error( + request=request, + client_id=client_id, + response_type=response_type, + redirect_uri=redirect_uri, + name="invalid_request", + err_response="code_challenge", + ) + if code_challenge: method = normalize_pkce_method(code_challenge_method) diff --git a/tests/sentry/web/frontend/test_oauth_authorize.py b/tests/sentry/web/frontend/test_oauth_authorize.py index e60adafc516858..5b58290d374076 100644 --- a/tests/sentry/web/frontend/test_oauth_authorize.py +++ b/tests/sentry/web/frontend/test_oauth_authorize.py @@ -160,6 +160,17 @@ def test_pkce_missing_method_rejected_for_v1(self) -> None: resp.context["error"] == "Missing or invalid code_challenge_method parameter." ) + def test_pkce_method_without_challenge_rejected(self) -> None: + self.login_as(self.user) + + resp = self.client.get( + f"{self.path}?response_type=code&client_id={self.application.client_id}&code_challenge_method=S256" + ) + + assert resp.status_code == 400 + self.assertTemplateUsed("sentry/oauth-error.html") + assert resp.context["error"] == "Missing or invalid code_challenge parameter." + def test_pkce_rejects_invalid_challenge_length(self) -> None: self.login_as(self.user)