diff --git a/migrations_lockfile.txt b/migrations_lockfile.txt index 3a8a165431ba..90b6c44c0e39 100644 --- a/migrations_lockfile.txt +++ b/migrations_lockfile.txt @@ -31,7 +31,7 @@ releases: 0004_cleanup_failed_safe_deletes replays: 0007_organizationmember_replay_access -sentry: 1019_add_integration_debug_json +sentry: 1020_alter_apiapplication_client_secret_nullable social_auth: 0003_social_auth_json_field diff --git a/src/sentry/api/authentication.py b/src/sentry/api/authentication.py index 98658956205b..d50543acab6d 100644 --- a/src/sentry/api/authentication.py +++ b/src/sentry/api/authentication.py @@ -327,7 +327,9 @@ def authenticate(self, request: Request): except ApiApplication.DoesNotExist: raise invalid_pair_error - if not constant_time_compare(application.client_secret, client_secret): + if application.client_secret is None or not constant_time_compare( + application.client_secret, client_secret + ): raise invalid_pair_error try: @@ -355,6 +357,9 @@ def get_payload_from_client_secret_jwt( raise AuthenticationFailed("Application not found") client_secret = application.client_secret + if client_secret is None: + raise AuthenticationFailed("Application does not have a client secret") + try: encoded_jwt = tokens[1] except IndexError: diff --git a/src/sentry/migrations/1020_alter_apiapplication_client_secret_nullable.py b/src/sentry/migrations/1020_alter_apiapplication_client_secret_nullable.py new file mode 100644 index 000000000000..6a4affc09cca --- /dev/null +++ b/src/sentry/migrations/1020_alter_apiapplication_client_secret_nullable.py @@ -0,0 +1,34 @@ +# Generated by Django 5.2.8 on 2026-01-20 21:29 + +from django.db import migrations, models + +import sentry.models.apiapplication +from sentry.new_migrations.migrations import CheckedMigration + + +class Migration(CheckedMigration): + # This flag is used to mark that a migration shouldn't be automatically run in production. + # This should only be used for operations where it's safe to run the migration after your + # code has deployed. So this should not be used for most operations that alter the schema + # of a table. + # Here are some things that make sense to mark as post deployment: + # - Large data migrations. Typically we want these to be run manually so that they can be + # monitored and not block the deploy for a long period of time while they run. + # - Adding indexes to large tables. Since this can take a long time, we'd generally prefer to + # run this outside deployments so that we don't block them. Note that while adding an index + # is a schema change, it's completely safe to run the operation after the code has deployed. + # Once deployed, run these manually via: https://develop.sentry.dev/database-migrations/#migration-deployment + + is_post_deployment = False + + dependencies = [ + ("sentry", "1019_add_integration_debug_json"), + ] + + operations = [ + migrations.AlterField( + model_name="apiapplication", + name="client_secret", + field=models.TextField(null=True, default=sentry.models.apiapplication.generate_token), + ), + ] diff --git a/src/sentry/models/apiapplication.py b/src/sentry/models/apiapplication.py index 39e46a1b97aa..4d1854dc73e6 100644 --- a/src/sentry/models/apiapplication.py +++ b/src/sentry/models/apiapplication.py @@ -62,7 +62,11 @@ class ApiApplication(Model): __relocation_scope__ = RelocationScope.Global client_id = models.CharField(max_length=64, unique=True, default=generate_token) - client_secret = models.TextField(default=generate_token) + # NULL for public clients (RFC 6749 §2.1) - CLIs, native apps, SPAs + # Public clients cannot securely store secrets, so they use PKCE and/or + # refresh token rotation instead of client authentication. + # Use client_secret=None to create a public client. + client_secret = models.TextField(null=True, default=generate_token) owner = FlexibleForeignKey("sentry.User", null=True) name = models.CharField(max_length=64, blank=True, default=generate_name) status = BoundedPositiveIntegerField( @@ -135,6 +139,19 @@ def outboxes_for_update(self) -> list[ControlOutbox]: def is_active(self): return self.status == ApiApplicationStatus.active + @property + def is_public(self) -> bool: + """Check if this is a public client (RFC 6749 §2.1). + + Public clients (native apps, CLIs, SPAs) cannot securely store + credentials, so they have no client_secret. They rely on PKCE + for authorization code flow and refresh token rotation for + token refresh (RFC 9700 §4.14.2). + + Public clients are created with client_secret=None. + """ + return self.client_secret is None + def is_allowed_response_type(self, value: object) -> TypeIs[Literal["code", "token"]]: return value in ("code", "token") diff --git a/src/sentry/sentry_apps/models/sentry_app.py b/src/sentry/sentry_apps/models/sentry_app.py index 4434f101db34..88ed1d064dbd 100644 --- a/src/sentry/sentry_apps/models/sentry_app.py +++ b/src/sentry/sentry_apps/models/sentry_app.py @@ -196,6 +196,8 @@ def is_installed_on(self, organization): def build_signature(self, body): assert self.application is not None secret = self.application.client_secret + # SentryApps always have a client_secret (they are confidential clients) + assert secret is not None return hmac.new( key=secret.encode("utf-8"), msg=body.encode("utf-8"), digestmod=sha256 ).hexdigest() diff --git a/src/sentry/web/frontend/oauth_token.py b/src/sentry/web/frontend/oauth_token.py index 87765d2c0326..64df96388d14 100644 --- a/src/sentry/web/frontend/oauth_token.py +++ b/src/sentry/web/frontend/oauth_token.py @@ -147,66 +147,54 @@ def post(self, request: Request) -> HttpResponse: }, ) - # Device flow supports public clients per RFC 8628 §5.6. - # Public clients only provide client_id to identify themselves. - # If client_secret is provided, we still validate it for confidential clients. - if grant_type == GrantTypes.DEVICE_CODE: - if not client_id: - return self.error( - request=request, - name="invalid_client", - reason="missing client_id", - status=401, - ) - - # Build query - validate secret only if provided (confidential client) - query = {"client_id": client_id} - if client_secret: - query["client_secret"] = client_secret + if not client_id: + return self.error( + request=request, + name="invalid_client", + reason="missing client_id", + status=401, + ) + if client_secret: try: - application = ApiApplication.objects.get(**query) + application = ApiApplication.objects.get( + client_id=client_id, + client_secret=client_secret, + ) + is_public_client = False except ApiApplication.DoesNotExist: metrics.incr("oauth_token.post.invalid", sample_rate=1.0) - if client_secret: - logger.warning( - "Invalid client_id / secret pair", - extra={"client_id": client_id}, - ) - reason = "invalid client_id or client_secret" - else: - logger.warning("Invalid client_id", extra={"client_id": client_id}) - reason = "invalid client_id" + logger.warning( + "Invalid client_id / secret pair", + extra={"client_id": client_id}, + ) return self.error( request=request, name="invalid_client", - reason=reason, + reason="invalid client_id or client_secret", status=401, ) else: - # Other grant types require confidential client authentication - if not client_id or not client_secret: + try: + application = ApiApplication.objects.get(client_id=client_id) + except ApiApplication.DoesNotExist: + metrics.incr("oauth_token.post.invalid", sample_rate=1.0) + logger.warning("Invalid client_id", extra={"client_id": client_id}) return self.error( request=request, name="invalid_client", - reason="missing client credentials", + reason="invalid client_id or client_secret", status=401, ) - try: - # Note: We don't filter by status here to distinguish between invalid - # credentials (unknown client) and inactive applications. This allows - # proper grant cleanup per RFC 6749 §10.5 and clearer metrics. - application = ApiApplication.objects.get( - client_id=client_id, - client_secret=client_secret, - ) - except ApiApplication.DoesNotExist: - metrics.incr( - "oauth_token.post.invalid", - sample_rate=1.0, + is_public_client = application.is_public + + if not is_public_client: + metrics.incr("oauth_token.post.invalid", sample_rate=1.0) + logger.warning( + "Confidential client missing secret", + extra={"client_id": client_id}, ) - logger.warning("Invalid client_id / secret pair", extra={"client_id": client_id}) return self.error( request=request, name="invalid_client", @@ -214,6 +202,17 @@ def post(self, request: Request) -> HttpResponse: status=401, ) + if is_public_client and grant_type not in [ + GrantTypes.AUTHORIZATION, + GrantTypes.DEVICE_CODE, + GrantTypes.REFRESH, + ]: + return self.error( + request=request, + name="unauthorized_client", + reason="public clients cannot use this grant type", + ) + # Check application status separately from credential validation. # This preserves metric clarity and provides consistent error handling. if application.status != ApiApplicationStatus.active: @@ -272,8 +271,11 @@ def post(self, request: Request) -> HttpResponse: token_data = self.get_access_tokens(request=request, application=application) elif grant_type == GrantTypes.DEVICE_CODE: return self.handle_device_code_grant(request=request, application=application) - else: + elif grant_type == GrantTypes.REFRESH: token_data = self.get_refresh_token(request=request, application=application) + else: + # Should not reach here due to earlier grant_type validation + return self.error(request=request, name="unsupported_grant_type") if "error" in token_data: return self.error( request=request, @@ -581,7 +583,7 @@ def handle_device_code_grant( # are atomic. This prevents duplicate tokens if delete fails after # token creation succeeds. with transaction.atomic(router.db_for_write(ApiToken)): - # Create the access token + # Create the access token for the device flow token = ApiToken.objects.create( application=application, user_id=device_code.user.id, diff --git a/tests/sentry/api/endpoints/test_api_application_rotate_secrets.py b/tests/sentry/api/endpoints/test_api_application_rotate_secrets.py index 664aee7fe981..ed24ba45bfb8 100644 --- a/tests/sentry/api/endpoints/test_api_application_rotate_secrets.py +++ b/tests/sentry/api/endpoints/test_api_application_rotate_secrets.py @@ -36,6 +36,7 @@ def test_invalid_app_id(self) -> None: def test_valid_call(self) -> None: self.login_as(self.user) old_secret = self.app.client_secret + assert old_secret is not None response = self.client.post(self.path, data={}) new_secret = response.data["clientSecret"] assert len(new_secret) == len(old_secret) diff --git a/tests/sentry/models/test_apiapplication.py b/tests/sentry/models/test_apiapplication.py index 8e63ebec7665..ca7b0aed103f 100644 --- a/tests/sentry/models/test_apiapplication.py +++ b/tests/sentry/models/test_apiapplication.py @@ -5,6 +5,25 @@ @control_silo_test class ApiApplicationTest(TestCase): + def test_is_public_with_null_secret(self) -> None: + """Public clients are created with client_secret=None (NULL in DB).""" + app = ApiApplication.objects.create( + owner=self.user, + redirect_uris="http://example.com", + client_secret=None, + ) + assert app.is_public is True + + def test_is_public_with_secret(self) -> None: + """Confidential clients have a client_secret.""" + app = ApiApplication.objects.create( + owner=self.user, + redirect_uris="http://example.com", + # client_secret defaults to a generated token + ) + assert app.client_secret is not None + assert app.is_public is False + def test_is_valid_redirect_uri(self) -> None: app = ApiApplication.objects.create( owner=self.user, diff --git a/tests/sentry/sentry_apps/api/endpoints/test_sentry_app_rotate_secret.py b/tests/sentry/sentry_apps/api/endpoints/test_sentry_app_rotate_secret.py index 6964cceb3534..b05c2de3a50d 100644 --- a/tests/sentry/sentry_apps/api/endpoints/test_sentry_app_rotate_secret.py +++ b/tests/sentry/sentry_apps/api/endpoints/test_sentry_app_rotate_secret.py @@ -84,6 +84,7 @@ def test_valid_call(self) -> None: self.login_as(self.user) assert self.sentry_app.application is not None old_secret = self.sentry_app.application.client_secret + assert old_secret is not None response = self.client.post(self.url) new_secret = response.data["clientSecret"] assert len(new_secret) == len(old_secret) @@ -94,6 +95,7 @@ def test_superuser_has_access(self) -> None: self.login_as(user=superuser, superuser=True) assert self.sentry_app.application is not None old_secret = self.sentry_app.application.client_secret + assert old_secret is not None response = self.client.post(self.url) new_secret = response.data["clientSecret"] assert len(new_secret) == len(old_secret) diff --git a/tests/sentry/web/frontend/test_oauth_token.py b/tests/sentry/web/frontend/test_oauth_token.py index 46f4aa9266a4..a435b1d9d475 100644 --- a/tests/sentry/web/frontend/test_oauth_token.py +++ b/tests/sentry/web/frontend/test_oauth_token.py @@ -1083,6 +1083,129 @@ def test_pkce_verifier_too_long(self) -> None: assert not ApiGrant.objects.filter(id=grant.id).exists() +@control_silo_test +class OAuthTokenPublicClientAuthCodeTest(TestCase): + """Tests for public client authorization_code flow. + + Public clients (RFC 6749 §2.1) cannot securely store client_secret, + so they must use PKCE (RFC 7636) for the authorization_code flow. + """ + + @cached_property + def path(self) -> str: + return "/oauth/token/" + + def setUp(self) -> None: + super().setUp() + # Create a public client (no client_secret) + self.public_application = ApiApplication.objects.create( + owner=self.user, + redirect_uris="https://example.com", + client_secret=None, # Public client + ) + assert self.public_application.is_public is True + + def test_public_client_authorization_code_with_pkce_succeeds(self) -> None: + """Public client can use authorization_code with PKCE.""" + code_verifier = "dBjftJeZ4CVP-mB92K27uhbUJU1p1r_wW1gFWFOEjXk" + code_challenge = "E9Melhoa2OwvFrEMTJguCHaoeK1t8URWbuGJSstw-cM" + + grant = ApiGrant.objects.create( + user=self.user, + application=self.public_application, + redirect_uri="https://example.com", + code_challenge=code_challenge, + code_challenge_method="S256", + ) + + resp = self.client.post( + self.path, + { + "grant_type": "authorization_code", + "redirect_uri": "https://example.com", + "code": grant.code, + "code_verifier": code_verifier, + "client_id": self.public_application.client_id, + # No client_secret - public client + }, + ) + + assert resp.status_code == 200 + data = json.loads(resp.content) + assert "access_token" in data + assert data["token_type"] == "Bearer" + + def test_public_client_authorization_code_without_pkce_succeeds(self) -> None: + """Public client can use authorization_code without PKCE (currently allowed). + + NOTE: RFC 9700 (OAuth 2.0 Security BCP) recommends that public clients + SHOULD use PKCE for authorization_code flow. However, this is currently + not enforced to maintain backward compatibility. The PKCE requirement + is enforced at the authorization endpoint level (during grant creation), + not at the token endpoint level. + + Future enhancement: Consider requiring PKCE for public clients at the + authorization endpoint (when creating the grant). + """ + # Create grant without PKCE challenge + grant = ApiGrant.objects.create( + user=self.user, + application=self.public_application, + redirect_uri="https://example.com", + # No code_challenge - no PKCE + ) + + resp = self.client.post( + self.path, + { + "grant_type": "authorization_code", + "redirect_uri": "https://example.com", + "code": grant.code, + "client_id": self.public_application.client_id, + # No client_secret - public client + # No code_verifier - no PKCE + }, + ) + + # Currently succeeds - PKCE is optional for backward compatibility + assert resp.status_code == 200 + data = json.loads(resp.content) + assert "access_token" in data + + def test_public_client_with_wrong_secret_fails(self) -> None: + """Providing a wrong secret for a public client should fail. + + Public clients (created with client_secret=None) have no secret, so any + provided secret will not match and authentication should fail. + """ + code_verifier = "dBjftJeZ4CVP-mB92K27uhbUJU1p1r_wW1gFWFOEjXk" + code_challenge = "E9Melhoa2OwvFrEMTJguCHaoeK1t8URWbuGJSstw-cM" + + grant = ApiGrant.objects.create( + user=self.user, + application=self.public_application, + redirect_uri="https://example.com", + code_challenge=code_challenge, + code_challenge_method="S256", + ) + + resp = self.client.post( + self.path, + { + "grant_type": "authorization_code", + "redirect_uri": "https://example.com", + "code": grant.code, + "code_verifier": code_verifier, + "client_id": self.public_application.client_id, + "client_secret": "some_random_secret", + }, + ) + + assert resp.status_code == 401 + data = json.loads(resp.content) + assert data["error"] == "invalid_client" + + @control_silo_test class OAuthTokenDeviceCodeTest(TestCase): """Tests for device code grant type (RFC 8628 §3.4/§3.5).""" @@ -1372,17 +1495,27 @@ def test_public_client_success(self) -> None: securely store credentials. They should be able to authenticate with just client_id. """ - self.device_code.status = self.DeviceCodeStatus.APPROVED - self.device_code.user = self.user - self.device_code.save() + # Create a public application (no client_secret) + public_app = ApiApplication.objects.create( + owner=self.user, + redirect_uris="http://example.com", + client_secret=None, # Public client + ) + public_device_code = ApiDeviceCode.objects.create( + application=public_app, + user_code="ABCD-1234", + scope_list=["project:read"], + status=self.DeviceCodeStatus.APPROVED, + user=self.user, + ) # Request without client_secret (public client) resp = self.client.post( self.path, { "grant_type": "urn:ietf:params:oauth:grant-type:device_code", - "device_code": self.device_code.device_code, - "client_id": self.application.client_id, + "device_code": public_device_code.device_code, + "client_id": public_app.client_id, # No client_secret - this is a public client }, ) @@ -1395,12 +1528,12 @@ def test_public_client_success(self) -> None: assert data["user"]["id"] == str(self.user.id) # Device code should be deleted - assert not ApiDeviceCode.objects.filter(id=self.device_code.id).exists() + assert not ApiDeviceCode.objects.filter(id=public_device_code.id).exists() # Token should be created token = ApiToken.objects.get(token=data["access_token"]) assert token.user == self.user - assert token.application == self.application + assert token.application == public_app def test_public_client_invalid_client_id(self) -> None: """Public clients with invalid client_id should return invalid_client.""" @@ -1435,14 +1568,25 @@ def test_public_client_missing_client_id(self) -> None: def test_public_client_authorization_pending(self) -> None: """Public client polling pending device code should return authorization_pending.""" - assert self.device_code.status == self.DeviceCodeStatus.PENDING + # Create a public application (no client_secret) + public_app = ApiApplication.objects.create( + owner=self.user, + redirect_uris="http://example.com", + client_secret=None, # Public client + ) + public_device_code = ApiDeviceCode.objects.create( + application=public_app, + user_code="PEND-1234", + scope_list=["project:read"], + status=self.DeviceCodeStatus.PENDING, # Pending status + ) resp = self.client.post( self.path, { "grant_type": "urn:ietf:params:oauth:grant-type:device_code", - "device_code": self.device_code.device_code, - "client_id": self.application.client_id, + "device_code": public_device_code.device_code, + "client_id": public_app.client_id, # No client_secret - public client }, ) @@ -1450,7 +1594,7 @@ def test_public_client_authorization_pending(self) -> None: assert json.loads(resp.content) == {"error": "authorization_pending"} # Device code should still exist - assert ApiDeviceCode.objects.filter(id=self.device_code.id).exists() + assert ApiDeviceCode.objects.filter(id=public_device_code.id).exists() def test_confidential_client_wrong_secret_rejected(self) -> None: """Device flow with wrong client_secret should be rejected. @@ -1474,3 +1618,347 @@ def test_confidential_client_wrong_secret_rejected(self) -> None: ) assert resp.status_code == 401 assert json.loads(resp.content) == {"error": "invalid_client"} + + +@control_silo_test +class OAuthTokenPublicClientRefreshTest(TestCase): + """Tests for public client refresh token rotation. + + Public clients (CLIs, native apps) cannot securely store client_secret, + so they use refresh token rotation instead of client authentication. + Each refresh issues a new refresh token and deletes the old one. + """ + + @cached_property + def path(self) -> str: + return "/oauth/token/" + + def setUp(self) -> None: + super().setUp() + # Create a public client (no client_secret) + self.public_application = ApiApplication.objects.create( + owner=self.user, + redirect_uris="https://example.com", + client_secret=None, # Public client + ) + + # Create a token for the public client + self.token = ApiToken.objects.create( + application=self.public_application, + user=self.user, + ) + # Store the original refresh token for tests + self.original_refresh_token = self.token.refresh_token + + def test_public_client_can_refresh_without_secret(self) -> None: + """Public clients should be able to refresh tokens without client_secret.""" + resp = self.client.post( + self.path, + { + "grant_type": "refresh_token", + "refresh_token": self.original_refresh_token, + "client_id": self.public_application.client_id, + # No client_secret - this is a public client + }, + ) + assert resp.status_code == 200 + + data = json.loads(resp.content) + assert "access_token" in data + assert "refresh_token" in data + assert data["token_type"] == "Bearer" + + def test_refresh_rotation_issues_new_tokens(self) -> None: + """Refresh should issue new access and refresh tokens (in-place update).""" + old_token_id = self.token.id + + resp = self.client.post( + self.path, + { + "grant_type": "refresh_token", + "refresh_token": self.original_refresh_token, + "client_id": self.public_application.client_id, + }, + ) + assert resp.status_code == 200 + + data = json.loads(resp.content) + + # New tokens should be different from original + assert data["access_token"] != self.token.token + assert data["refresh_token"] != self.original_refresh_token + + # Same token record is updated in-place (not deleted and recreated) + assert ApiToken.objects.filter(id=old_token_id).exists() + self.token.refresh_from_db() + assert self.token.token == data["access_token"] + assert self.token.refresh_token == data["refresh_token"] + + def test_old_refresh_token_cannot_be_reused(self) -> None: + """Using an already-rotated refresh token should fail.""" + # First refresh - should succeed + resp1 = self.client.post( + self.path, + { + "grant_type": "refresh_token", + "refresh_token": self.original_refresh_token, + "client_id": self.public_application.client_id, + }, + ) + assert resp1.status_code == 200 + + # Try to use the old refresh token again - should fail + resp2 = self.client.post( + self.path, + { + "grant_type": "refresh_token", + "refresh_token": self.original_refresh_token, + "client_id": self.public_application.client_id, + }, + ) + assert resp2.status_code == 400 + assert json.loads(resp2.content) == {"error": "invalid_grant"} + + def test_confidential_client_refresh_requires_secret(self) -> None: + """Confidential clients must still provide client_secret for refresh.""" + # Create a confidential client (has client_secret) + confidential_app = ApiApplication.objects.create( + owner=self.user, + redirect_uris="https://example.com", + # client_secret is auto-generated + ) + token = ApiToken.objects.create( + application=confidential_app, + user=self.user, + ) + + # Try to refresh without secret - should fail + resp = self.client.post( + self.path, + { + "grant_type": "refresh_token", + "refresh_token": token.refresh_token, + "client_id": confidential_app.client_id, + # Missing client_secret + }, + ) + assert resp.status_code == 401 + assert json.loads(resp.content) == {"error": "invalid_client"} + + def test_confidential_client_refresh_with_secret_works(self) -> None: + """Confidential clients can refresh with valid client_secret (existing behavior).""" + # Create a confidential client + confidential_app = ApiApplication.objects.create( + owner=self.user, + redirect_uris="https://example.com", + ) + client_secret = confidential_app.client_secret + token = ApiToken.objects.create( + application=confidential_app, + user=self.user, + ) + original_refresh = token.refresh_token + + # Refresh with secret - should succeed + resp = self.client.post( + self.path, + { + "grant_type": "refresh_token", + "refresh_token": original_refresh, + "client_id": confidential_app.client_id, + "client_secret": client_secret, + }, + ) + assert resp.status_code == 200 + + data = json.loads(resp.content) + assert "access_token" in data + # Confidential client uses in-place refresh, same token object + token.refresh_from_db() + assert token.token == data["access_token"] + + def test_confidential_client_refresh_with_wrong_secret_fails(self) -> None: + """Confidential clients with incorrect client_secret should fail.""" + confidential_app = ApiApplication.objects.create( + owner=self.user, + redirect_uris="https://example.com", + ) + token = ApiToken.objects.create( + application=confidential_app, + user=self.user, + ) + + resp = self.client.post( + self.path, + { + "grant_type": "refresh_token", + "refresh_token": token.refresh_token, + "client_id": confidential_app.client_id, + "client_secret": "wrong_secret", + }, + ) + assert resp.status_code == 401 + assert json.loads(resp.content) == {"error": "invalid_client"} + + def test_confidential_client_refresh_with_empty_secret_fails(self) -> None: + """Sending client_secret='' should NOT authenticate a confidential client. + + This tests that an empty string secret doesn't bypass authentication + by being treated as "no secret provided" for a public client. + """ + confidential_app = ApiApplication.objects.create( + owner=self.user, + redirect_uris="https://example.com", + ) + token = ApiToken.objects.create( + application=confidential_app, + user=self.user, + ) + + resp = self.client.post( + self.path, + { + "grant_type": "refresh_token", + "refresh_token": token.refresh_token, + "client_id": confidential_app.client_id, + "client_secret": "", # Empty string, not missing + }, + ) + assert resp.status_code == 401 + assert json.loads(resp.content) == {"error": "invalid_client"} + + def test_confidential_client_refresh_rotates_in_place(self) -> None: + """Confidential clients rotate tokens in-place (same DB record). + + Both confidential and public clients rotate refresh tokens, but: + - Confidential: updates the same token record (in-place rotation) + - Public: creates new token record, deletes old one + + This test verifies the confidential client behavior. + """ + confidential_app = ApiApplication.objects.create( + owner=self.user, + redirect_uris="https://example.com", + ) + client_secret = confidential_app.client_secret + token = ApiToken.objects.create( + application=confidential_app, + user=self.user, + ) + original_token_id = token.id + original_refresh_token = token.refresh_token + + resp = self.client.post( + self.path, + { + "grant_type": "refresh_token", + "refresh_token": original_refresh_token, + "client_id": confidential_app.client_id, + "client_secret": client_secret, + }, + ) + assert resp.status_code == 200 + + data = json.loads(resp.content) + + # Same token object should still exist (in-place update) + assert ApiToken.objects.filter(id=original_token_id).exists() + + # Refresh token IS rotated (new value) + token.refresh_from_db() + assert token.refresh_token != original_refresh_token + assert token.refresh_token == data["refresh_token"] + + # Old refresh token can NOT be reused + resp2 = self.client.post( + self.path, + { + "grant_type": "refresh_token", + "refresh_token": original_refresh_token, + "client_id": confidential_app.client_id, + "client_secret": client_secret, + }, + ) + assert resp2.status_code == 400 + assert json.loads(resp2.content) == {"error": "invalid_grant"} + + def test_refresh_preserves_scopes(self) -> None: + """Refreshed token should have the same scopes as the original.""" + self.token.scope_list = ["project:read", "org:read"] + self.token.save() + + resp = self.client.post( + self.path, + { + "grant_type": "refresh_token", + "refresh_token": self.original_refresh_token, + "client_id": self.public_application.client_id, + }, + ) + assert resp.status_code == 200 + + data = json.loads(resp.content) + new_token = ApiToken.objects.get(token=data["access_token"]) + + assert set(new_token.get_scopes()) == {"project:read", "org:read"} + + def test_refresh_preserves_organization_scope(self) -> None: + """Refreshed token should have the same organization scope.""" + organization = self.create_organization(owner=self.user) + self.token.scoping_organization_id = organization.id + self.token.save() + + resp = self.client.post( + self.path, + { + "grant_type": "refresh_token", + "refresh_token": self.original_refresh_token, + "client_id": self.public_application.client_id, + }, + ) + assert resp.status_code == 200 + + data = json.loads(resp.content) + assert data["organization_id"] == str(organization.id) + + new_token = ApiToken.objects.get(token=data["access_token"]) + assert new_token.scoping_organization_id == organization.id + + def test_invalid_refresh_token_returns_error(self) -> None: + """Invalid refresh token should return invalid_grant.""" + resp = self.client.post( + self.path, + { + "grant_type": "refresh_token", + "refresh_token": "invalid_token", + "client_id": self.public_application.client_id, + }, + ) + assert resp.status_code == 400 + assert json.loads(resp.content) == {"error": "invalid_grant"} + + def test_missing_refresh_token_returns_error(self) -> None: + """Missing refresh_token should return invalid_request.""" + resp = self.client.post( + self.path, + { + "grant_type": "refresh_token", + "client_id": self.public_application.client_id, + }, + ) + assert resp.status_code == 400 + assert json.loads(resp.content) == {"error": "invalid_request"} + + def test_scope_changes_not_supported(self) -> None: + """Scope parameter on refresh is not supported.""" + resp = self.client.post( + self.path, + { + "grant_type": "refresh_token", + "refresh_token": self.original_refresh_token, + "client_id": self.public_application.client_id, + "scope": "project:write", # Trying to change scope + }, + ) + assert resp.status_code == 400 + assert json.loads(resp.content) == {"error": "invalid_request"}