diff --git a/backend/api_v2/admin.py b/backend/api_v2/admin.py index 37f0837a74..1e0b497e0a 100644 --- a/backend/api_v2/admin.py +++ b/backend/api_v2/admin.py @@ -1,5 +1,37 @@ from django.contrib import admin -from .models import APIDeployment, APIKey +from .models import APIDeployment, APIKey, OrganizationRateLimit + + +@admin.register(OrganizationRateLimit) +class OrganizationRateLimitAdmin(admin.ModelAdmin): + list_display = [ + "organization", + "concurrent_request_limit", + "created_at", + "modified_at", + ] + list_filter = ["created_at", "modified_at"] + search_fields = ["organization__name", "organization__organization_id"] + readonly_fields = ["created_at", "modified_at"] + fieldsets = ( + ( + None, + { + "fields": ( + "organization", + "concurrent_request_limit", + ) + }, + ), + ( + "Timestamps", + { + "fields": ("created_at", "modified_at"), + "classes": ("collapse",), + }, + ), + ) + admin.site.register([APIDeployment, APIKey]) diff --git a/backend/api_v2/api_deployment_views.py b/backend/api_v2/api_deployment_views.py index 4178b5f2ea..4767cb1a62 100644 --- a/backend/api_v2/api_deployment_views.py +++ b/backend/api_v2/api_deployment_views.py @@ -1,5 +1,6 @@ import json import logging +import uuid from typing import Any from configuration.models import Configuration @@ -20,8 +21,9 @@ from api_v2.constants import ApiExecution from api_v2.deployment_helper import DeploymentHelper from api_v2.dto import DeploymentExecutionDTO -from api_v2.exceptions import NoActiveAPIKeyError +from api_v2.exceptions import NoActiveAPIKeyError, RateLimitExceeded from api_v2.models import APIDeployment +from api_v2.rate_limiter import APIDeploymentRateLimiter from api_v2.serializers import ( APIDeploymentListSerializer, APIDeploymentSerializer, @@ -67,6 +69,7 @@ def post( ) -> Response: api: APIDeployment = deployment_execution_dto.api api_key: str = deployment_execution_dto.api_key + organization = api.organization serializer = ExecutionRequestSerializer( data=request.data, context={"api": api, "api_key": api_key} @@ -87,21 +90,48 @@ def post( if presigned_urls: DeploymentHelper.load_presigned_files(presigned_urls, file_objs) - response = DeploymentHelper.execute_workflow( - organization_name=org_name, - api=api, - file_objs=file_objs, - timeout=timeout, - include_metadata=include_metadata, - include_metrics=include_metrics, - use_file_history=use_file_history, - tag_names=tag_names, - llm_profile_id=llm_profile_id, - hitl_queue_name=hitl_queue_name, - hitl_packet_id=hitl_packet_id, - custom_data=custom_data, - request_headers=dict(request.headers), + # Generate execution ID for rate limiting + execution_id = str(uuid.uuid4()) + + # Check and acquire rate limit slot + can_proceed, limit_info = APIDeploymentRateLimiter.check_and_acquire( + organization, execution_id ) + if not can_proceed: + logger.warning( + f"Rate limit exceeded for org {organization.organization_id}: {limit_info}" + ) + raise RateLimitExceeded( + current_usage=limit_info["current_usage"], + limit=limit_info["limit"], + limit_type=limit_info["limit_type"], + ) + + # Execute workflow with blanket exception handling + try: + response = DeploymentHelper.execute_workflow( + organization_name=org_name, + api=api, + file_objs=file_objs, + timeout=timeout, + include_metadata=include_metadata, + include_metrics=include_metrics, + use_file_history=use_file_history, + tag_names=tag_names, + llm_profile_id=llm_profile_id, + hitl_queue_name=hitl_queue_name, + hitl_packet_id=hitl_packet_id, + custom_data=custom_data, + request_headers=dict(request.headers), + execution_id=execution_id, + ) + except Exception as error: + # Release slot on any failure during workflow setup/execution + APIDeploymentRateLimiter.release_slot(organization, execution_id) + logger.exception(f"Workflow execution failed: {error}") + raise + + # Success - signal will handle slot release when workflow completes if "error" in response and response["error"]: logger.error("API deployment execution failed") return Response( diff --git a/backend/api_v2/deployment_helper.py b/backend/api_v2/deployment_helper.py index f808ffc4f3..bfbff58b7b 100644 --- a/backend/api_v2/deployment_helper.py +++ b/backend/api_v2/deployment_helper.py @@ -1,4 +1,5 @@ import logging +import uuid from io import BytesIO from typing import Any from urllib.parse import urlencode, urlparse @@ -33,6 +34,7 @@ ) from api_v2.key_helper import KeyHelper from api_v2.models import APIDeployment, APIKey +from api_v2.rate_limiter import APIDeploymentRateLimiter from api_v2.serializers import APIExecutionResponseSerializer from api_v2.utils import APIDeploymentUtils @@ -158,6 +160,7 @@ def execute_workflow( hitl_packet_id: str | None = None, custom_data: dict[str, Any] | None = None, request_headers=None, + execution_id: str | None = None, ) -> ReturnDict: """Execute workflow by api. @@ -172,10 +175,20 @@ def execute_workflow( hitl_queue_name (str, optional): Custom queue name for manual review hitl_packet_id (str, optional): Packet ID for packet-based review custom_data (dict[str, Any], optional): JSON data for custom_data variable replacement in prompts + execution_id (str, optional): Pre-generated execution ID for rate limiting. + If None, a new UUID will be generated. Returns: ReturnDict: execution status/ result + + Note: + Rate limiting is handled at the view layer. This method should be called + after rate limit checks have passed, with a pre-acquired execution_id. """ + # Use provided execution_id or generate one (for backward compatibility) + if execution_id is None: + execution_id = str(uuid.uuid4()) + workflow_id = api.workflow.id pipeline_id = api.id if hitl_queue_name: @@ -186,6 +199,7 @@ def execute_workflow( workflow_execution = WorkflowExecutionServiceHelper.create_workflow_execution( workflow_id=workflow_id, pipeline_id=pipeline_id, + execution_id=execution_id, mode=WorkflowExecution.Mode.QUEUE, tags=tags, total_files=len(file_objs), @@ -264,6 +278,10 @@ def execute_workflow( if not include_metrics: result.remove_result_metrics() except Exception as error: + # Release rate limit slot (workflow setup/dispatch failed, async job not started) + APIDeploymentRateLimiter.release_slot(api.organization, str(execution_id)) + + # Clean up storage DestinationConnector.delete_api_storage_dir( workflow_id=workflow_id, execution_id=execution_id ) diff --git a/backend/api_v2/exceptions.py b/backend/api_v2/exceptions.py index 7de9a20abf..7dab3c1789 100644 --- a/backend/api_v2/exceptions.py +++ b/backend/api_v2/exceptions.py @@ -58,6 +58,37 @@ def __init__( super().__init__(detail, code) +class RateLimitExceeded(APIException): + status_code = 429 + default_detail = "Rate limit exceeded" + + def __init__( + self, + current_usage: int = 0, + limit: int = 0, + limit_type: str = "organization", + detail: str | None = None, + code: str | None = None, + ): + from api_v2.rate_limit_constants import RateLimitMessages + + self.current_usage = current_usage + self.limit = limit + self.limit_type = limit_type + + if detail is None: + if limit_type == "organization": + detail = RateLimitMessages.get_org_limit_exceeded_message( + current_usage=current_usage, limit=limit + ) + else: + detail = RateLimitMessages.get_global_limit_exceeded_message( + current_usage=current_usage, limit=limit + ) + + super().__init__(detail, code) + + class PresignedURLFetchError(APIException): default_detail = "Failed to fetch file from presigned URL" diff --git a/backend/api_v2/management/__init__.py b/backend/api_v2/management/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/backend/api_v2/management/commands/__init__.py b/backend/api_v2/management/commands/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/backend/api_v2/management/commands/clear_org_rate_limit_cache.py b/backend/api_v2/management/commands/clear_org_rate_limit_cache.py new file mode 100644 index 0000000000..b2010b5d95 --- /dev/null +++ b/backend/api_v2/management/commands/clear_org_rate_limit_cache.py @@ -0,0 +1,158 @@ +from account_v2.models import Organization +from django.core.cache import cache +from django.core.management.base import BaseCommand + +from api_v2.models import OrganizationRateLimit +from api_v2.rate_limit_constants import RateLimitKeys + + +class Command(BaseCommand): + help = ( + "Clear rate limit cache for organizations (useful after changing default limit)" + ) + + def add_arguments(self, parser): + parser.add_argument( + "--org-id", + type=str, + help="Clear cache for specific organization ID or name (default: all orgs)", + ) + parser.add_argument( + "--all", + action="store_true", + help="Clear cache for ALL organizations (with or without custom limits)", + ) + + def handle(self, *args, **options): + org_id = options.get("org_id") + clear_all = options["all"] + + if org_id: + # Clear cache for specific organization + self._clear_org_cache(org_id) + elif clear_all: + # Clear cache for ALL organizations + self._clear_all_orgs_cache() + else: + # Clear cache for organizations with custom limits + self._clear_custom_limits_cache() + + def _clear_org_cache(self, org_id: str): + """Clear cache for a specific organization.""" + # Get organization + try: + organization = Organization.objects.get(organization_id=org_id) + except Organization.DoesNotExist: + try: + organization = Organization.objects.get(name=org_id) + except Organization.DoesNotExist: + self.stdout.write(self.style.ERROR(f'Organization "{org_id}" not found')) + return + + cache_key = RateLimitKeys.get_org_limit_cache_key( + str(organization.organization_id) + ) + cache.delete(cache_key) + + self.stdout.write( + self.style.SUCCESS( + f"✓ Cleared cache for {organization.name} ({organization.organization_id})" + ) + ) + + def _clear_custom_limits_cache(self): + """Clear cache for organizations with custom rate limits.""" + org_limits = OrganizationRateLimit.objects.select_related("organization").all() + + if not org_limits: + self.stdout.write( + self.style.WARNING("No custom rate limits found - nothing to clear") + ) + return + + count = 0 + for org_limit in org_limits: + org_id = str(org_limit.organization.organization_id) + cache_key = RateLimitKeys.get_org_limit_cache_key(org_id) + cache.delete(cache_key) + count += 1 + + self.stdout.write( + self.style.SUCCESS( + f"✓ Cleared cache for {count} organizations with custom limits" + ) + ) + + def _clear_all_orgs_cache(self): + """Clear cache for ALL organizations (with or without custom limits).""" + self.stdout.write( + self.style.WARNING( + "Clearing cache for ALL organizations (including those using defaults)..." + ) + ) + + # Try pattern-based deletion first (works with Redis cache backend) + if self._try_pattern_delete(): + self.stdout.write( + self.style.SUCCESS( + "✓ Cleared all organization rate limit caches using pattern deletion" + ) + ) + else: + # Fallback: iterate through all organizations + self._clear_all_orgs_individually() + + self.stdout.write( + self.style.WARNING( + "Note: Cache will be repopulated on next API request for each org" + ) + ) + + def _try_pattern_delete(self) -> bool: + """Try to delete cache keys using pattern (Redis-specific). + + Returns: + True if pattern deletion succeeded, False if not supported + """ + try: + # Check if cache backend supports delete_pattern (Redis cache) + if hasattr(cache, "delete_pattern"): + pattern = RateLimitKeys.ORG_LIMIT_CACHE_KEY_PATTERN.replace( + "{org_id}", "*" + ) + deleted_count = cache.delete_pattern(pattern) + self.stdout.write(f"Deleted {deleted_count} cache keys using pattern") + return True + return False + except Exception as e: + self.stdout.write(self.style.WARNING(f"Pattern deletion failed: {e}")) + return False + + def _clear_all_orgs_individually(self): + """Fallback: Clear cache by iterating through all organizations.""" + organizations = Organization.objects.all() + count = organizations.count() + + if count == 0: + self.stdout.write(self.style.WARNING("No organizations found")) + return + + # Confirm for large number of orgs + if count > 50: + self.stdout.write( + f"This will clear cache for {count} organizations individually." + ) + confirm = input("Continue? [y/N]: ") + if confirm.lower() != "y": + self.stdout.write(self.style.WARNING("Cancelled")) + return + + cleared = 0 + for org in organizations: + cache_key = RateLimitKeys.get_org_limit_cache_key(str(org.organization_id)) + cache.delete(cache_key) + cleared += 1 + + self.stdout.write( + self.style.SUCCESS(f"✓ Cleared cache for {cleared} organizations") + ) diff --git a/backend/api_v2/management/commands/delete_org_rate_limit.py b/backend/api_v2/management/commands/delete_org_rate_limit.py new file mode 100644 index 0000000000..dbd9de859b --- /dev/null +++ b/backend/api_v2/management/commands/delete_org_rate_limit.py @@ -0,0 +1,102 @@ +from account_v2.models import Organization +from django.core.management.base import BaseCommand, CommandError + +from api_v2.models import OrganizationRateLimit +from api_v2.rate_limiter import APIDeploymentRateLimiter + + +class Command(BaseCommand): + help = "Delete custom organization rate limit (reverts to default)" + + def add_arguments(self, parser): + parser.add_argument("org_id", type=str, help="Organization ID or name") + parser.add_argument( + "--force", + action="store_true", + help="Skip confirmation prompt", + ) + + def handle(self, *args, **options): + org_id = options["org_id"] + force = options["force"] + + # Get organization (try organization_id first, then name) + try: + organization = Organization.objects.get(organization_id=org_id) + except Organization.DoesNotExist: + try: + organization = Organization.objects.get(name=org_id) + except Organization.DoesNotExist: + raise CommandError(f'Organization "{org_id}" not found') + + # Check if custom limit exists + try: + org_rate_limit = OrganizationRateLimit.objects.get(organization=organization) + except OrganizationRateLimit.DoesNotExist: + self.stdout.write( + self.style.WARNING( + f'No custom rate limit found for organization "{organization.name}" ' + f"({organization.organization_id})" + ) + ) + return + + current_limit = org_rate_limit.concurrent_request_limit + + # Confirm deletion unless --force + if not force: + self.stdout.write( + f"Organization: {organization.name} ({organization.organization_id})" + ) + self.stdout.write(f"Current custom limit: {current_limit}") + self.stdout.write( + "\nThis will delete the custom rate limit and revert to the system default." + ) + + confirm = input("Continue? [y/N]: ") + if confirm.lower() != "y": + self.stdout.write(self.style.WARNING("Cancelled")) + return + + # Delete the custom limit (cache is automatically cleared via post_delete signal) + org_rate_limit.delete() + + self.stdout.write( + self.style.SUCCESS( + f'Deleted custom rate limit for organization "{organization.name}" ' + f"({organization.organization_id})" + ) + ) + + # Show what default will be used + from django.conf import settings + + from api_v2.rate_limit_constants import RateLimitDefaults + + default_limit = getattr( + settings, + "API_DEPLOYMENT_DEFAULT_RATE_LIMIT", + RateLimitDefaults.DEFAULT_ORG_LIMIT, + ) + self.stdout.write( + self.style.WARNING(f"Will now use system default: {default_limit}") + ) + + # Show current usage + try: + usage = APIDeploymentRateLimiter.get_current_usage(organization) + self.stdout.write( + f'\nCurrent usage: {usage["org_count"]}/{default_limit} concurrent requests' + ) + + if usage["org_count"] >= default_limit: + self.stdout.write( + self.style.ERROR( + "WARNING: Current usage exceeds default limit! " + "New requests will be rate limited until usage drops." + ) + ) + except Exception as e: + self.stdout.write(self.style.WARNING(f"Could not fetch current usage: {e}")) + + self.stdout.write(self.style.SUCCESS("✓ Cache automatically cleared")) diff --git a/backend/api_v2/management/commands/get_org_rate_limit.py b/backend/api_v2/management/commands/get_org_rate_limit.py new file mode 100644 index 0000000000..ab93073a30 --- /dev/null +++ b/backend/api_v2/management/commands/get_org_rate_limit.py @@ -0,0 +1,95 @@ +from account_v2.models import Organization +from django.conf import settings +from django.core.cache import cache +from django.core.management.base import BaseCommand, CommandError + +from api_v2.models import OrganizationRateLimit +from api_v2.rate_limit_constants import RateLimitKeys +from api_v2.rate_limiter import APIDeploymentRateLimiter + + +class Command(BaseCommand): + help = "View organization rate limit information and current usage" + + def add_arguments(self, parser): + parser.add_argument("org_id", type=str, help="Organization ID or name") + parser.add_argument( + "--clear-cache", + action="store_true", + help="Clear cache and force refresh from DB", + ) + + def handle(self, *args, **options): + org_id = options["org_id"] + clear_cache = options["clear_cache"] + + # Get organization (try organization_id first, then name) + try: + organization = Organization.objects.get(organization_id=org_id) + except Organization.DoesNotExist: + try: + organization = Organization.objects.get(name=org_id) + except Organization.DoesNotExist: + raise CommandError(f'Organization "{org_id}" not found') + + org_uuid = str(organization.organization_id) + + # Clear cache if requested + if clear_cache: + cache_key = RateLimitKeys.get_org_limit_cache_key(org_uuid) + cache.delete(cache_key) + self.stdout.write(self.style.WARNING("Cache cleared")) + + # Get from DB + try: + org_limit = OrganizationRateLimit.objects.get(organization=organization) + db_limit = org_limit.concurrent_request_limit + self.stdout.write(f"Database Limit: {db_limit}") + self.stdout.write(f"Last Modified: {org_limit.modified_at}") + except OrganizationRateLimit.DoesNotExist: + self.stdout.write("Database Limit: Not set") + db_limit = settings.API_DEPLOYMENT_DEFAULT_RATE_LIMIT + self.stdout.write(f"Using Default: {db_limit}") + + # Check cache status + cache_key = RateLimitKeys.get_org_limit_cache_key(org_uuid) + cached_limit = cache.get(cache_key) + if cached_limit is not None: + self.stdout.write(f"Cached Limit: {cached_limit} ✓") + else: + self.stdout.write("Cached Limit: Not cached (will be cached on next request)") + + # Get current usage + try: + usage = APIDeploymentRateLimiter.get_current_usage(organization) + + self.stdout.write("\n--- Current Usage ---") + self.stdout.write( + f'Organization: {usage["org_count"]}/{usage["org_limit"]} concurrent requests' + ) + self.stdout.write( + f'Global System: {usage["global_count"]}/{usage["global_limit"]} concurrent requests' + ) + + # Usage percentage + org_pct = ( + (usage["org_count"] / usage["org_limit"] * 100) + if usage["org_limit"] > 0 + else 0 + ) + + if org_pct >= 90: + self.stdout.write( + self.style.ERROR(f"⚠ Organization at {org_pct:.1f}% capacity") + ) + elif org_pct >= 70: + self.stdout.write( + self.style.WARNING(f"Organization at {org_pct:.1f}% capacity") + ) + else: + self.stdout.write( + self.style.SUCCESS(f"Organization at {org_pct:.1f}% capacity") + ) + + except Exception as e: + self.stdout.write(self.style.ERROR(f"Error fetching usage: {e}")) diff --git a/backend/api_v2/management/commands/list_org_rate_limits.py b/backend/api_v2/management/commands/list_org_rate_limits.py new file mode 100644 index 0000000000..7c2d9284b6 --- /dev/null +++ b/backend/api_v2/management/commands/list_org_rate_limits.py @@ -0,0 +1,45 @@ +from django.core.management.base import BaseCommand + +from api_v2.models import OrganizationRateLimit +from api_v2.rate_limiter import APIDeploymentRateLimiter + + +class Command(BaseCommand): + help = "List all organization rate limits" + + def add_arguments(self, parser): + parser.add_argument( + "--with-usage", + action="store_true", + help="Include current usage statistics (slower)", + ) + + def handle(self, *args, **options): + with_usage = options["with_usage"] + + org_limits = OrganizationRateLimit.objects.select_related("organization").all() + + if not org_limits: + self.stdout.write("No custom rate limits configured") + return + + self.stdout.write(f"Found {org_limits.count()} custom rate limits:\n") + + for org_limit in org_limits: + org = org_limit.organization + limit = org_limit.concurrent_request_limit + + self.stdout.write(f"• {org.name} ({org.organization_id})") + self.stdout.write(f" Limit: {limit}") + + if with_usage: + try: + usage = APIDeploymentRateLimiter.get_current_usage(org) + pct = (usage["org_count"] / limit * 100) if limit > 0 else 0 + self.stdout.write( + f' Usage: {usage["org_count"]}/{limit} ({pct:.1f}%)' + ) + except Exception as e: + self.stdout.write(f" Usage: Error - {e}") + + self.stdout.write("") diff --git a/backend/api_v2/management/commands/set_org_rate_limit.py b/backend/api_v2/management/commands/set_org_rate_limit.py new file mode 100644 index 0000000000..6652ea775b --- /dev/null +++ b/backend/api_v2/management/commands/set_org_rate_limit.py @@ -0,0 +1,72 @@ +from account_v2.models import Organization +from django.core.management.base import BaseCommand, CommandError + +from api_v2.models import OrganizationRateLimit +from api_v2.rate_limiter import APIDeploymentRateLimiter + + +class Command(BaseCommand): + help = "Set or update organization rate limit for API deployments" + + def add_arguments(self, parser): + parser.add_argument( + "org_id", type=str, help="Organization ID or organization name" + ) + parser.add_argument( + "limit", type=int, help="Concurrent request limit (positive integer)" + ) + + def handle(self, *args, **options): + org_id = options["org_id"] + limit = options["limit"] + + # Validate limit + if limit <= 0: + raise CommandError("Limit must be a positive integer") + + # Get organization (try organization_id first, then name) + try: + organization = Organization.objects.get(organization_id=org_id) + except Organization.DoesNotExist: + # Try by name + try: + organization = Organization.objects.get(name=org_id) + except Organization.DoesNotExist: + raise CommandError( + f'Organization with ID or name "{org_id}" does not exist' + ) + + # Create or update rate limit + # Cache is automatically cleared via model.save() + org_rate_limit, created = OrganizationRateLimit.objects.update_or_create( + organization=organization, defaults={"concurrent_request_limit": limit} + ) + + action = "Created" if created else "Updated" + self.stdout.write( + self.style.SUCCESS( + f'{action} rate limit for organization "{organization.name}" ' + f"({organization.organization_id}): {limit}" + ) + ) + + # Show current usage + try: + usage = APIDeploymentRateLimiter.get_current_usage(organization) + self.stdout.write( + self.style.WARNING( + f'Current usage: {usage["org_count"]}/{limit} concurrent requests' + ) + ) + + if usage["org_count"] >= limit: + self.stdout.write( + self.style.ERROR( + "WARNING: Current usage exceeds new limit! " + "New requests will be rate limited until usage drops." + ) + ) + except Exception as e: + self.stdout.write(self.style.WARNING(f"Could not fetch current usage: {e}")) + + self.stdout.write(self.style.SUCCESS("✓ Cache automatically cleared")) diff --git a/backend/api_v2/migrations/0003_add_organization_rate_limit.py b/backend/api_v2/migrations/0003_add_organization_rate_limit.py new file mode 100644 index 0000000000..2cdcda92e7 --- /dev/null +++ b/backend/api_v2/migrations/0003_add_organization_rate_limit.py @@ -0,0 +1,61 @@ +# Generated by Django 4.2.1 on 2025-11-08 12:26 + +import uuid + +import django.db.models.deletion +from django.db import migrations, models + + +class Migration(migrations.Migration): + dependencies = [ + ("account_v2", "0002_user_auth_provider"), + ("api_v2", "0002_apideployment_shared_to_org_and_more"), + ] + + operations = [ + migrations.CreateModel( + name="OrganizationRateLimit", + fields=[ + ("created_at", models.DateTimeField(auto_now_add=True)), + ("modified_at", models.DateTimeField(auto_now=True)), + ( + "id", + models.UUIDField( + default=uuid.uuid4, + editable=False, + primary_key=True, + serialize=False, + ), + ), + ( + "concurrent_request_limit", + models.IntegerField( + db_comment="Maximum number of concurrent API deployment requests allowed for this organization", + default=5, + ), + ), + ( + "organization", + models.ForeignKey( + blank=True, + db_comment="Foreign key reference to the Organization model.", + default=None, + null=True, + on_delete=django.db.models.deletion.CASCADE, + to="account_v2.organization", + ), + ), + ], + options={ + "verbose_name": "Organization Rate Limit", + "verbose_name_plural": "Organization Rate Limits", + "db_table": "organization_rate_limit", + }, + ), + migrations.AddConstraint( + model_name="organizationratelimit", + constraint=models.UniqueConstraint( + fields=("organization",), name="unique_org_rate_limit" + ), + ), + ] diff --git a/backend/api_v2/models.py b/backend/api_v2/models.py index e4d67ec61b..addc1d980c 100644 --- a/backend/api_v2/models.py +++ b/backend/api_v2/models.py @@ -1,8 +1,11 @@ +import logging import uuid from typing import Any from account_v2.models import User from django.db import models +from django.db.models.signals import post_delete +from django.dispatch import receiver from pipeline_v2.models import Pipeline from utils.models.base_model import BaseModel from utils.models.organization_mixin import ( @@ -14,6 +17,8 @@ from api_v2.constants import ApiExecution +logger = logging.getLogger(__name__) + API_NAME_MAX_LENGTH = 30 DESCRIPTION_MAX_LENGTH = 255 API_ENDPOINT_MAX_LENGTH = 255 @@ -147,6 +152,46 @@ class Meta: ] +class OrganizationRateLimit(DefaultOrganizationMixin, BaseModel): + """Model to store organization-specific API deployment rate limits.""" + + id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False) + concurrent_request_limit = models.IntegerField( + default=5, + db_comment="Maximum number of concurrent API deployment requests allowed for this organization", + ) + + def __str__(self) -> str: + return f"{self.organization} - Limit: {self.concurrent_request_limit}" + + def save(self, *args, **kwargs): + """Save and automatically clear cache.""" + super().save(*args, **kwargs) + self._clear_cache() + + def _clear_cache(self): + """Clear cached limit for this organization.""" + from django.core.cache import cache + + from api_v2.rate_limit_constants import RateLimitKeys + + org_id = str(self.organization.organization_id) + cache_key = RateLimitKeys.get_org_limit_cache_key(org_id) + cache.delete(cache_key) + logger.info(f"Cleared rate limit cache after save: org {org_id}") + + class Meta: + verbose_name = "Organization Rate Limit" + verbose_name_plural = "Organization Rate Limits" + db_table = "organization_rate_limit" + constraints = [ + models.UniqueConstraint( + fields=["organization"], + name="unique_org_rate_limit", + ), + ] + + class APIKey(BaseModel): id = models.UUIDField( primary_key=True, @@ -217,3 +262,17 @@ class Meta: verbose_name = "Api Deployment key" verbose_name_plural = "Api Deployment keys" db_table = "api_deployment_key" + + +# Signal handlers for OrganizationRateLimit +@receiver(post_delete, sender=OrganizationRateLimit) +def clear_org_rate_limit_cache_on_delete(sender, instance, **kwargs): + """Clear cache when rate limit record is deleted.""" + from django.core.cache import cache + + from api_v2.rate_limit_constants import RateLimitKeys + + org_id = str(instance.organization.organization_id) + cache_key = RateLimitKeys.get_org_limit_cache_key(org_id) + cache.delete(cache_key) + logger.info(f"Cleared rate limit cache after delete: org {org_id}") diff --git a/backend/api_v2/rate_limit_constants.py b/backend/api_v2/rate_limit_constants.py new file mode 100644 index 0000000000..069171cad5 --- /dev/null +++ b/backend/api_v2/rate_limit_constants.py @@ -0,0 +1,117 @@ +"""Constants and key generators for API deployment rate limiting. + +Centralizes all Redis key patterns, cache keys, and configuration constants +to avoid duplication and make maintenance easier. +""" + + +class RateLimitKeys: + """Redis key patterns for rate limiting. + + All rate limiting keys follow a consistent naming convention: + - ZSET keys: api_deployment:rate_limit:{scope}:{id} + - Lock keys: lock:rate_limit:{scope}:{id} + - Cache keys: rate_limit:cache:{type}:{id} + """ + + # ZSET keys for tracking active executions + GLOBAL_EXECUTIONS_KEY = "api_deployment:rate_limit:global" + ORG_EXECUTIONS_KEY_PATTERN = "api_deployment:rate_limit:org:{org_id}" + + # Lock keys for distributed locking + GLOBAL_LOCK_KEY = "lock:rate_limit:global" + ORG_LOCK_KEY_PATTERN = "lock:rate_limit:org:{org_id}" + + # Django cache keys for caching DB values + ORG_LIMIT_CACHE_KEY_PATTERN = "rate_limit:cache:org_limit:{org_id}" + + @classmethod + def get_org_executions_key(cls, org_id: str) -> str: + """Get Redis ZSET key for organization's active executions. + + Args: + org_id: Organization UUID as string + + Returns: + Redis key for org's execution ZSET + """ + return cls.ORG_EXECUTIONS_KEY_PATTERN.format(org_id=org_id) + + @classmethod + def get_org_lock_key(cls, org_id: str) -> str: + """Get Redis lock key for organization rate limiting. + + Args: + org_id: Organization UUID as string + + Returns: + Redis key for org's distributed lock + """ + return cls.ORG_LOCK_KEY_PATTERN.format(org_id=org_id) + + @classmethod + def get_org_limit_cache_key(cls, org_id: str) -> str: + """Get Django cache key for organization's rate limit value. + + Args: + org_id: Organization UUID as string + + Returns: + Django cache key for org's limit + """ + return cls.ORG_LIMIT_CACHE_KEY_PATTERN.format(org_id=org_id) + + +class RateLimitDefaults: + """Default values for rate limiting configuration. + + These are fallback values when settings are not configured. + Actual values should be set via Django settings / environment variables. + """ + + # Rate limits + DEFAULT_ORG_LIMIT = 20 # Concurrent requests per organization + DEFAULT_GLOBAL_LIMIT = 100 # Concurrent requests system-wide + + # TTL and timing + DEFAULT_TTL_HOURS = 6 # Hours to keep execution in ZSET + DEFAULT_CACHE_TTL_SECONDS = 600 # 10 minutes cache for org limits + + # Lock timeouts + DEFAULT_LOCK_TIMEOUT_SECONDS = 2 # Lock auto-expires + DEFAULT_LOCK_BLOCKING_TIMEOUT_SECONDS = 5 # Wait time to acquire + + +class RateLimitMessages: + """User-facing messages for rate limiting. + + Centralized messages for consistency across the application. + """ + + ORG_LIMIT_EXCEEDED_TEMPLATE = ( + "Organization has reached the maximum concurrent API requests limit " + "({current_usage}/{limit}). Please try again later." + ) + + GLOBAL_LIMIT_EXCEEDED_TEMPLATE = ( + "System has reached the global maximum concurrent API requests limit " + "({current_usage}/{limit}). Please try again later." + ) + + LOCK_ACQUISITION_FAILED = "Failed to acquire rate limit lock. Please try again." + + REDIS_ERROR = "Rate limiting service temporarily unavailable. Request allowed." + + @classmethod + def get_org_limit_exceeded_message(cls, current_usage: int, limit: int) -> str: + """Get formatted organization limit exceeded message.""" + return cls.ORG_LIMIT_EXCEEDED_TEMPLATE.format( + current_usage=current_usage, limit=limit + ) + + @classmethod + def get_global_limit_exceeded_message(cls, current_usage: int, limit: int) -> str: + """Get formatted global limit exceeded message.""" + return cls.GLOBAL_LIMIT_EXCEEDED_TEMPLATE.format( + current_usage=current_usage, limit=limit + ) diff --git a/backend/api_v2/rate_limiter.py b/backend/api_v2/rate_limiter.py new file mode 100644 index 0000000000..f2b10a3b7b --- /dev/null +++ b/backend/api_v2/rate_limiter.py @@ -0,0 +1,364 @@ +import logging +import time + +from account_v2.models import Organization +from django.conf import settings +from django.core.cache import cache +from django_redis import get_redis_connection + +from api_v2.models import OrganizationRateLimit +from api_v2.rate_limit_constants import ( + RateLimitDefaults, + RateLimitKeys, +) + +logger = logging.getLogger(__name__) + +redis_cache = get_redis_connection("default") + + +class APIDeploymentRateLimiter: + """Rate limiter for API deployment concurrent requests using Redis ZSET with TTL.""" + + @classmethod + def _get_org_key(cls, org_id: str) -> str: + """Generate Redis key for organization-specific rate limiting.""" + return RateLimitKeys.get_org_executions_key(org_id) + + @classmethod + def _get_ttl_seconds(cls) -> int: + """Get TTL in seconds from hours setting.""" + ttl_hours = getattr( + settings, + "API_DEPLOYMENT_RATE_LIMIT_TTL_HOURS", + RateLimitDefaults.DEFAULT_TTL_HOURS, + ) + return ttl_hours * 3600 + + @classmethod + def _get_cutoff_timestamp(cls) -> float: + """Get timestamp cutoff for removing stale entries.""" + return time.time() - cls._get_ttl_seconds() + + @classmethod + def _cleanup_expired_entries(cls, key: str) -> None: + """Remove entries older than TTL using ZREMRANGEBYSCORE.""" + cutoff = cls._get_cutoff_timestamp() + redis_cache.zremrangebyscore(key, 0, cutoff) + + @classmethod + def _get_org_limit(cls, organization: Organization) -> int: + """Get the concurrent request limit for an organization. + + Uses Django cache framework to avoid DB queries on every request. + Cache automatically cleared on OrganizationRateLimit save/delete. + TTL is refreshed on every read to keep frequently-used limits cached. + + Args: + organization: Organization instance + + Returns: + Concurrent request limit for the organization + """ + org_id = str(organization.organization_id) + cache_key = RateLimitKeys.get_org_limit_cache_key(org_id) + cache_ttl = getattr( + settings, + "API_DEPLOYMENT_RATE_LIMIT_CACHE_TTL", + RateLimitDefaults.DEFAULT_CACHE_TTL_SECONDS, + ) + + # Try cache first + cached_limit = cache.get(cache_key) + if cached_limit is not None: + # Refresh TTL on cache hit (extends TTL for frequently-used orgs) + cache.set(cache_key, cached_limit, cache_ttl) + return cached_limit + + # Cache miss - query DB and cache result + try: + org_rate_limit = OrganizationRateLimit.objects.get(organization=organization) + limit = org_rate_limit.concurrent_request_limit + except OrganizationRateLimit.DoesNotExist: + limit = getattr( + settings, + "API_DEPLOYMENT_DEFAULT_RATE_LIMIT", + RateLimitDefaults.DEFAULT_ORG_LIMIT, + ) + + # Cache with TTL + cache.set(cache_key, limit, cache_ttl) + + return limit + + @classmethod + def clear_org_limit_cache(cls, org_id: str) -> None: + """Clear cached rate limit for an organization. + + Args: + org_id: Organization UUID as string + """ + cache_key = RateLimitKeys.get_org_limit_cache_key(org_id) + cache.delete(cache_key) + logger.info(f"Cleared rate limit cache for org {org_id}") + + @classmethod + def get_current_usage(cls, organization: Organization) -> dict: + """Get current usage statistics for an organization. + + Returns: + dict: { + 'org_count': int - current org-level concurrent requests, + 'global_count': int - current system-wide concurrent requests, + 'org_limit': int - org-level limit, + 'global_limit': int - global limit + } + + Raises: + Exception: If Redis operations fail + """ + org_key = cls._get_org_key(str(organization.organization_id)) + + # Cleanup expired entries before counting + # NOTE: Manual cleanup is required even though ZSET keys have TTL because: + # 1. Redis TTL expires the entire ZSET key after inactivity, not individual entries + # 2. Individual ZSET entries remain until the whole key expires + # 3. Without cleanup, stale entries skew the count and cause incorrect rate limiting + # 4. ZREMRANGEBYSCORE removes entries older than 6 hours (keeps count accurate) + # 5. TTL (expire) garbage collects the entire key if org becomes inactive + # Both mechanisms work together: manual cleanup + key TTL + cls._cleanup_expired_entries(org_key) + cls._cleanup_expired_entries(RateLimitKeys.GLOBAL_EXECUTIONS_KEY) + + org_count = redis_cache.zcard(org_key) + global_count = redis_cache.zcard(RateLimitKeys.GLOBAL_EXECUTIONS_KEY) + org_limit = cls._get_org_limit(organization) + global_limit = getattr( + settings, + "API_DEPLOYMENT_GLOBAL_RATE_LIMIT", + RateLimitDefaults.DEFAULT_GLOBAL_LIMIT, + ) + + return { + "org_count": org_count, + "global_count": global_count, + "org_limit": org_limit, + "global_limit": global_limit, + } + + @classmethod + def check_and_acquire( + cls, organization: Organization, execution_id: str + ) -> tuple[bool, dict | None]: + """Atomically check rate limits and acquire slot using per-org Redis lock. + + This implementation uses per-organization locks to prevent race conditions + within each organization. The global limit is checked but not locked, which + means under extreme concurrent load across many organizations, the global + limit may briefly be exceeded by 1-2% before self-correcting. + + Future Enhancement: To strictly enforce global limits without any tolerance, + add a second lock acquisition for global limit checking: + + ```python + # After org limit check passes + from redis.lock import Lock + + global_lock = Lock( + redis_cache, + RateLimitKeys.GLOBAL_LOCK_KEY, + timeout=settings.API_DEPLOYMENT_RATE_LIMIT_LOCK_TIMEOUT, + ) + try: + if not global_lock.acquire(blocking=True, blocking_timeout=...): + return True, None # Fail open + + # Re-check global limit with lock held + global_count = redis_cache.zcard(RateLimitKeys.GLOBAL_EXECUTIONS_KEY) + if global_count >= global_limit: + return False, {...} + + # Add to both ZSETs while holding global lock + pipe.zadd(...) + pipe.execute() + finally: + global_lock.release() + ``` + + Args: + organization: Organization instance + execution_id: Unique execution identifier to track + + Returns: + tuple: (can_proceed: bool, limit_info: dict or None) + If can_proceed is False, limit_info contains details about the exceeded limit + """ + org_id = str(organization.organization_id) + org_lock_key = RateLimitKeys.get_org_lock_key(org_id) + org_key = cls._get_org_key(org_id) + current_timestamp = time.time() + cutoff = cls._get_cutoff_timestamp() + ttl_seconds = cls._get_ttl_seconds() + + from redis.lock import Lock + + lock_timeout = getattr( + settings, + "API_DEPLOYMENT_RATE_LIMIT_LOCK_TIMEOUT", + RateLimitDefaults.DEFAULT_LOCK_TIMEOUT_SECONDS, + ) + lock_blocking_timeout = getattr( + settings, + "API_DEPLOYMENT_RATE_LIMIT_LOCK_BLOCKING_TIMEOUT", + RateLimitDefaults.DEFAULT_LOCK_BLOCKING_TIMEOUT_SECONDS, + ) + + org_lock = Lock( + redis_cache, + org_lock_key, + timeout=lock_timeout, + blocking_timeout=lock_blocking_timeout, + ) + + try: + # Acquire per-organization lock + acquired = org_lock.acquire(blocking=True) + if not acquired: + logger.error(f"Failed to acquire rate limit lock for org {org_id}") + # Fail open: allow request to proceed + return True, None + + # Cleanup expired entries + redis_cache.zremrangebyscore(org_key, 0, cutoff) + redis_cache.zremrangebyscore(RateLimitKeys.GLOBAL_EXECUTIONS_KEY, 0, cutoff) + + # Check org-level limit + org_count = redis_cache.zcard(org_key) + org_limit = cls._get_org_limit(organization) + + if org_count >= org_limit: + logger.warning( + f"Organization {org_id} hit rate limit: {org_count}/{org_limit}" + ) + return False, { + "limit_type": "organization", + "current_usage": org_count, + "limit": org_limit, + } + + # Check global limit (no lock - eventual consistency) + global_count = redis_cache.zcard(RateLimitKeys.GLOBAL_EXECUTIONS_KEY) + global_limit = getattr( + settings, + "API_DEPLOYMENT_GLOBAL_RATE_LIMIT", + RateLimitDefaults.DEFAULT_GLOBAL_LIMIT, + ) + + if global_count >= global_limit: + logger.warning( + f"Global rate limit exceeded: {global_count}/{global_limit}" + ) + return False, { + "limit_type": "global", + "current_usage": global_count, + "limit": global_limit, + } + + # Both checks passed - add to both ZSETs atomically + pipe = redis_cache.pipeline() + pipe.zadd(org_key, {execution_id: current_timestamp}) + pipe.expire(org_key, ttl_seconds) + pipe.zadd( + RateLimitKeys.GLOBAL_EXECUTIONS_KEY, {execution_id: current_timestamp} + ) + pipe.expire(RateLimitKeys.GLOBAL_EXECUTIONS_KEY, ttl_seconds) + pipe.execute() + + logger.info( + f"Rate limit slot acquired for org {org_id}, execution {execution_id}" + ) + return True, None + + except Exception as e: + logger.error(f"Error in rate limit check for org {org_id}: {e}") + # Fail open: allow request on errors + return True, None + + finally: + try: + org_lock.release() + except Exception as e: + # Lock may have already expired or been released + logger.debug(f"Error releasing lock for org {org_id}: {e}") + + @classmethod + def check_rate_limit(cls, organization: Organization) -> tuple[bool, dict | None]: + """Check if a new request can be accepted without exceeding rate limits. + + DEPRECATED: Use check_and_acquire() instead for atomic check-and-acquire. + This method is kept for backward compatibility but has a race condition. + + Args: + organization: Organization instance + + Returns: + tuple: (can_proceed: bool, limit_info: dict or None) + If can_proceed is False, limit_info contains details about the exceeded limit + """ + try: + usage = cls.get_current_usage(organization) + + # Check org-level limit + if usage["org_count"] >= usage["org_limit"]: + logger.warning( + f"Organization {organization.organization_id} hit rate limit: " + f"{usage['org_count']}/{usage['org_limit']}" + ) + return False, { + "limit_type": "organization", + "current_usage": usage["org_count"], + "limit": usage["org_limit"], + } + + # Check global limit + if usage["global_count"] >= usage["global_limit"]: + logger.warning( + f"Global rate limit exceeded: {usage['global_count']}/{usage['global_limit']}" + ) + return False, { + "limit_type": "global", + "current_usage": usage["global_count"], + "limit": usage["global_limit"], + } + + return True, None + except Exception as e: + # If Redis fails, allow the request to proceed (fail open) + logger.error( + f"Rate limit check failed for org {organization.organization_id}: {e}. " + "Allowing request to proceed." + ) + return True, None + + @classmethod + def release_slot(cls, organization_id: str, execution_id: str) -> None: + """Release a rate limit slot when execution completes. + + Args: + organization_id: Organization identifier + execution_id: Execution identifier to release + """ + org_key = cls._get_org_key(organization_id) + + try: + # Use pipeline to remove from both keys atomically + pipe = redis_cache.pipeline() + pipe.zrem(org_key, execution_id) + pipe.zrem(RateLimitKeys.GLOBAL_EXECUTIONS_KEY, execution_id) + pipe.execute() + + logger.debug( + f"Rate limit slot released for org {organization_id}, execution {execution_id}" + ) + except Exception as e: + logger.error(f"Failed to release rate limit slot: {e}") diff --git a/backend/backend/settings/base.py b/backend/backend/settings/base.py index 130a7e0363..26f74c747e 100644 --- a/backend/backend/settings/base.py +++ b/backend/backend/settings/base.py @@ -108,6 +108,28 @@ def get_required_setting(setting_key: str, default: str | None = None) -> str | os.environ.get("API_DEPL_PRESIGNED_URL_MAX_FILE_SIZE_MB", 20) ) +# API Deployment Rate Limiting +API_DEPLOYMENT_DEFAULT_RATE_LIMIT = int( + os.environ.get("API_DEPLOYMENT_DEFAULT_RATE_LIMIT", 20) +) +API_DEPLOYMENT_GLOBAL_RATE_LIMIT = int( + os.environ.get("API_DEPLOYMENT_GLOBAL_RATE_LIMIT", 100) +) +API_DEPLOYMENT_RATE_LIMIT_TTL_HOURS = int( + os.environ.get("API_DEPLOYMENT_RATE_LIMIT_TTL_HOURS", 6) +) +# Cache TTL for organization rate limits (in seconds) +API_DEPLOYMENT_RATE_LIMIT_CACHE_TTL = int( + os.environ.get("API_DEPLOYMENT_RATE_LIMIT_CACHE_TTL", 600) +) +# Redis lock timeouts for rate limiting (in seconds) +API_DEPLOYMENT_RATE_LIMIT_LOCK_TIMEOUT = int( + os.environ.get("API_DEPLOYMENT_RATE_LIMIT_LOCK_TIMEOUT", 2) +) +API_DEPLOYMENT_RATE_LIMIT_LOCK_BLOCKING_TIMEOUT = int( + os.environ.get("API_DEPLOYMENT_RATE_LIMIT_LOCK_BLOCKING_TIMEOUT", 5) +) + DB_NAME = os.environ.get("DB_NAME", "unstract_db") DB_USER = os.environ.get("DB_USER", "unstract_dev") DB_HOST = os.environ.get("DB_HOST", "backend-db-1") diff --git a/backend/sample.env b/backend/sample.env index af73870e16..631819b98d 100644 --- a/backend/sample.env +++ b/backend/sample.env @@ -110,6 +110,21 @@ ENCRYPTION_KEY="Sample-Key" # Cache TTL CACHE_TTL_SEC=10800 +# API Deployment Rate Limiting +# Default concurrent request limit per organization +API_DEPLOYMENT_DEFAULT_RATE_LIMIT=20 +# Global system-wide concurrent request limit across all organizations +API_DEPLOYMENT_GLOBAL_RATE_LIMIT=100 +# Time window (in hours) to consider requests as "active" for rate limiting +API_DEPLOYMENT_RATE_LIMIT_TTL_HOURS=6 +# Cache TTL for organization rate limits (in seconds) - TTL is refreshed on every read +# Frequently-used orgs stay cached, inactive orgs expire after this duration +API_DEPLOYMENT_RATE_LIMIT_CACHE_TTL=600 +# Redis lock timeout (in seconds) - lock auto-expires if holder crashes +API_DEPLOYMENT_RATE_LIMIT_LOCK_TIMEOUT=2 +# Redis lock blocking timeout (in seconds) - how long to wait to acquire lock +API_DEPLOYMENT_RATE_LIMIT_LOCK_BLOCKING_TIMEOUT=5 + # Default user auth credentials DEFAULT_AUTH_USERNAME= DEFAULT_AUTH_PASSWORD= diff --git a/backend/workflow_manager/workflow_v2/models/execution.py b/backend/workflow_manager/workflow_v2/models/execution.py index 9ce0187bb5..11b5fdb37f 100644 --- a/backend/workflow_manager/workflow_v2/models/execution.py +++ b/backend/workflow_manager/workflow_v2/models/execution.py @@ -296,6 +296,8 @@ def update_execution( error (Optional[str], optional): Error message if any. Defaults to None. increment_attempt (bool, optional): Whether to increment attempt counter. Defaults to False. """ + should_release_rate_limit = False + if status is not None: status = ExecutionStatus(status) self.status = status.value @@ -305,6 +307,7 @@ def update_execution( ExecutionStatus.STOPPED, ]: self.execution_time = CommonUtils.time_since(self.created_at, 3) + should_release_rate_limit = True if error: self.error_message = error[:EXECUTION_ERROR_LENGTH] @@ -313,6 +316,31 @@ def update_execution( self.save() + # Release rate limit slot for API deployment executions after save + if should_release_rate_limit and self.pipeline_id: + self._release_api_deployment_rate_limit() + + def _release_api_deployment_rate_limit(self) -> None: + """Release rate limit slot for API deployment executions. + + Checks if this execution is for an API deployment and releases + the rate limit slot if applicable. + """ + try: + # Check if this is an API deployment execution + api_deployment = APIDeployment.objects.filter(id=self.pipeline_id).first() + if api_deployment and api_deployment.organization: + from api_v2.rate_limiter import APIDeploymentRateLimiter + + APIDeploymentRateLimiter.release_slot( + str(api_deployment.organization.organization_id), str(self.id) + ) + except Exception as e: + # Log but don't fail the execution update for rate limit release errors + logger.exception( + f"Failed to release rate limit slot for execution {self.id}: {e}" + ) + def update_execution_err(self, err_msg: str = "") -> None: """Update execution status to ERROR with an error message. diff --git a/docs/API_DEPLOYMENT_RATE_LIMITING.md b/docs/API_DEPLOYMENT_RATE_LIMITING.md new file mode 100644 index 0000000000..a7465b530e --- /dev/null +++ b/docs/API_DEPLOYMENT_RATE_LIMITING.md @@ -0,0 +1,656 @@ +# API Deployment Rate Limiting + +## Overview + +This document describes the rate limiting implementation for API deployments in Unstract. Rate limiting controls the number of concurrent API deployment requests to prevent resource exhaustion and ensure fair usage across organizations. + +**Scope**: This rate limiting **ONLY applies to API deployments** (REST API endpoints). It does **NOT** affect: +- ETL pipeline executions +- Manual workflow runs from the UI +- Scheduled tasks +- Background jobs + +## Architecture + +### Dual-Layer Rate Limiting + +The system implements two layers of rate limiting: + +1. **Per-Organization Limit**: Each organization has a maximum number of concurrent API requests +2. **Global Limit**: System-wide limit across all organizations + +Both limits are enforced, and requests are rejected if either limit is exceeded. + +### Key Components + +``` +┌─────────────────────────────────────────────────────────────┐ +│ API Request Flow (View Layer) │ +└─────────────────────────────────────────────────────────────┘ + │ + ▼ + ┌───────────────────────────────┐ + │ 1. Validate Request │ + │ (Serializer, Files) │ + └───────────────────────────────┘ + │ + ▼ + ┌───────────────────────────────┐ + │ 2. Check Organization Limit │ + │ (Per-Org Redis Lock) │ + └───────────────────────────────┘ + │ + ┌───────┴───────┐ + │ │ + ✓ Available ✗ Exceeded + │ │ + ▼ ▼ + ┌──────────────┐ ┌──────────────┐ + │ 3. Check │ │ Raise 429 │ + │ Global │ │ Rate Limit │ + │ Limit │ │ Exceeded │ + └──────────────┘ └──────────────┘ + │ + ┌───────┴───────┐ + │ │ + ✓ Available ✗ Exceeded + │ │ + ▼ ▼ + ┌──────────────┐ ┌──────────────┐ + │ 4. Acquire │ │ Raise 429 │ + │ Slot │ │ Rate Limit │ + │ (ZSET Add) │ │ Exceeded │ + └──────────────┘ └──────────────┘ + │ + ▼ + ┌──────────────────────────────────┐ + │ 5. Execute Workflow (try-catch) │ + │ - Setup (DB, files, queue) │ + │ - Dispatch async job │ + └──────────────────────────────────┘ + │ + ┌───┴───┐ + │ │ + Success Exception + │ │ + ▼ ▼ + ┌─────┐ ┌──────────────┐ + │ 6a. │ │ 6b. Release │ + │Signal│ │ Slot │ + │Will │ │ (Manual in │ + │Auto │ │ Exception │ + │Release│ │ Handler) │ + └─────┘ └──────────────┘ + │ + ▼ + ┌──────────────┐ + │ Re-raise │ + │ Exception │ + └──────────────┘ +``` + +**Key Design Points:** + +- **View Layer Responsibility**: Rate limiting handled in `api_deployment_views.py` +- **Dual Release Paths**: + - **View layer** releases on exceptions that propagate (early setup failures: DB, files) + - **Helper layer** releases on caught exceptions (async dispatch failures, config errors) + - **Signal** releases on successful async job completion +- **No Orphaned Slots**: Guaranteed cleanup on all error paths +- **No Double-Release**: Each failure path has exactly one release point + +### Technical Implementation + +#### 1. Redis Distributed Locks (Per-Organization) +- Each organization has a dedicated Redis lock +- Ensures atomic check-and-acquire operations +- Prevents race conditions within the same organization +- Lock timeout: 2 seconds (auto-release if holder crashes) +- Blocking timeout: 5 seconds (wait time to acquire lock) + +#### 2. Redis Sorted Sets (ZSET) +- Tracks active executions with timestamps +- Automatic TTL-based cleanup (6 hours default) +- Separate ZSETs for: + - Per-organization tracking: `api_deployment:rate_limit:org:{org_id}` + - Global tracking: `api_deployment:rate_limit:global` + +#### 3. Django Cache (Organization Limits) +- Caches organization rate limits from database +- **TTL**: 10 minutes +- **TTL Refresh**: Extended by 10 minutes on every read (LRU-like behavior) +- **Auto-invalidation**: Cache cleared on limit update/delete +- **Benefit**: ~95% reduction in database queries + +#### 4. Atomic Check-and-Acquire +```python +# Pseudocode +with redis_lock(org_lock_key): + cleanup_expired_entries() + if org_count >= org_limit: + return RATE_LIMIT_EXCEEDED + if global_count >= global_limit: + return RATE_LIMIT_EXCEEDED + # Atomically add to both ZSETs + zadd(org_key, execution_id, timestamp) + zadd(global_key, execution_id, timestamp) + return SUCCESS +``` + +#### 5. Automatic Release +- Slot automatically released when workflow execution completes +- Triggered by `WorkflowExecution.update_execution()` on completion/failure +- Fail-safe: Even if release fails, entries expire after TTL (6 hours) + +### Fail-Open Strategy + +If Redis is unavailable or operations fail, the system **allows requests** to proceed. This prevents rate limiting infrastructure issues from blocking legitimate traffic. + +## Configuration + +### Environment Variables + +All configuration is done via environment variables in `.env` or system environment: + +```bash +# Default concurrent request limit per organization (default: 20) +API_DEPLOYMENT_DEFAULT_RATE_LIMIT=20 + +# Global system-wide concurrent request limit (default: 100) +API_DEPLOYMENT_GLOBAL_RATE_LIMIT=100 + +# Time window (in hours) to keep requests as "active" (default: 6) +# Entries older than this are automatically cleaned up +API_DEPLOYMENT_RATE_LIMIT_TTL_HOURS=6 + +# Cache TTL for organization limits (in seconds, default: 600) +# TTL is refreshed on every read - frequently-used orgs stay cached +# Inactive orgs expire after this duration +API_DEPLOYMENT_RATE_LIMIT_CACHE_TTL=600 + +# Redis lock timeout (in seconds, default: 2) +# Lock auto-expires if holder crashes +API_DEPLOYMENT_RATE_LIMIT_LOCK_TIMEOUT=2 + +# Redis lock blocking timeout (in seconds, default: 5) +# How long to wait to acquire lock before giving up +API_DEPLOYMENT_RATE_LIMIT_LOCK_BLOCKING_TIMEOUT=5 +``` + +### Django Settings + +These environment variables are loaded in `backend/settings/base.py`: + +```python +API_DEPLOYMENT_DEFAULT_RATE_LIMIT = int(os.environ.get("API_DEPLOYMENT_DEFAULT_RATE_LIMIT", 20)) +API_DEPLOYMENT_GLOBAL_RATE_LIMIT = int(os.environ.get("API_DEPLOYMENT_GLOBAL_RATE_LIMIT", 100)) +# ... etc +``` + +## Management Commands + +Five management commands are provided for administrative operations: + +### 1. Set Organization Rate Limit + +Set or update a custom rate limit for a specific organization: + +```bash +# Using organization ID +python manage.py set_org_rate_limit org_a1b2c3d4e5f6g7h8 50 + +# Using organization name +python manage.py set_org_rate_limit acme-corp 50 +``` + +**Output**: +``` +Created rate limit for organization "acme-corp" (org_a1b2c3d4e5f6g7h8): 50 +Current usage: 5/50 concurrent requests +✓ Cache automatically cleared +``` + +**Features**: +- Accepts organization ID or name +- Auto-clears cache after update +- Shows current usage +- Warns if current usage exceeds new limit + +### 2. Get Organization Rate Limit + +View rate limit information and current usage for an organization: + +```bash +# View rate limit info +python manage.py get_org_rate_limit org_a1b2c3d4e5f6g7h8 + +# Clear cache and force refresh from DB +python manage.py get_org_rate_limit org_a1b2c3d4e5f6g7h8 --clear-cache +``` + +**Output**: +``` +Database Limit: 50 +Last Modified: 2025-11-08 10:30:00 +Cached Limit: 50 ✓ + +--- Current Usage --- +Organization: 5/50 concurrent requests +Global System: 45/100 concurrent requests +Organization at 10.0% capacity +``` + +### 3. List All Organization Rate Limits + +List all organizations with custom rate limits: + +```bash +# List without usage stats (fast) +python manage.py list_org_rate_limits + +# List with current usage (slower, queries Redis) +python manage.py list_org_rate_limits --with-usage +``` + +**Output**: +``` +Found 3 custom rate limits: + +• acme-corp (org_a1b2c3d4e5f6g7h8) + Limit: 50 + Usage: 5/50 (10.0%) + +• widgets-inc (org_r1II1U07Th3RnSfv) + Limit: 100 + Usage: 0/100 (0.0%) + +• tech-startup (org_zj4xaTiCdTToPlaj) + Limit: 25 + Usage: 18/25 (72.0%) +``` + +### 4. Delete Organization Rate Limit + +Remove a custom rate limit (organization reverts to system default): + +```bash +# With confirmation prompt +python manage.py delete_org_rate_limit org_a1b2c3d4e5f6g7h8 + +# Skip confirmation +python manage.py delete_org_rate_limit acme-corp --force +``` + +**Output**: +``` +Organization: acme-corp (org_a1b2c3d4e5f6g7h8) +Current custom limit: 50 + +This will delete the custom rate limit and revert to the system default. +Continue? [y/N]: y + +Deleted custom rate limit for organization "acme-corp" (org_a1b2c3d4e5f6g7h8) +Will now use system default: 20 +Current usage: 5/20 concurrent requests +✓ Cache automatically cleared +``` + +### 5. Clear Organization Rate Limit Cache + +Clear cached rate limits (useful after changing default limit via ENV): + +```bash +# Clear specific organization cache +python manage.py clear_org_rate_limit_cache --org-id org_a1b2c3d4e5f6g7h8 + +# Clear cache for all orgs with custom limits (default) +python manage.py clear_org_rate_limit_cache + +# Clear cache for ALL organizations (uses Redis pattern deletion) +python manage.py clear_org_rate_limit_cache --all +``` + +**Output** (for `--all`): +``` +Clearing cache for ALL organizations (including those using defaults)... +Deleted 138 cache keys using pattern +✓ Cleared all organization rate limit caches using pattern deletion +Note: Cache will be repopulated on next API request for each org +``` + +**Performance**: +- With Redis cache backend: Uses `delete_pattern("rate_limit:cache:org_limit:*")` - very fast +- Other cache backends: Falls back to iterating through organizations + +## Usage Scenarios + +### Scenario 1: Set Custom Limit for High-Volume Customer + +```bash +# Customer needs 200 concurrent requests +python manage.py set_org_rate_limit acme-corp 200 + +# Verify +python manage.py get_org_rate_limit acme-corp +``` + +### Scenario 2: Update System Default Limit + +```bash +# Step 1: Update environment variable +export API_DEPLOYMENT_DEFAULT_RATE_LIMIT=30 + +# Step 2: Restart application to load new setting +# (or reload via your deployment process) + +# Step 3: Clear all caches to pick up immediately +python manage.py clear_org_rate_limit_cache --all + +# Now all orgs without custom limits will use 30 +``` + +### Scenario 3: Monitor High-Usage Organizations + +```bash +# List all orgs with usage stats +python manage.py list_org_rate_limits --with-usage | grep -E "([7-9][0-9]|100)\.0%" + +# Check specific org +python manage.py get_org_rate_limit acme-corp +``` + +### Scenario 4: Temporarily Reduce Limit During Incident + +```bash +# Reduce limit to prevent overload +python manage.py set_org_rate_limit acme-corp 10 + +# ... incident resolved ... + +# Restore original limit +python manage.py set_org_rate_limit acme-corp 50 +``` + +### Scenario 5: Remove All Custom Limits (Revert to Defaults) + +```bash +# List all custom limits +python manage.py list_org_rate_limits + +# Delete each custom limit +python manage.py delete_org_rate_limit acme-corp --force +python manage.py delete_org_rate_limit widgets-inc --force +python manage.py delete_org_rate_limit tech-startup --force + +# All orgs now use system default (20) +``` + +## API Response Behavior + +### Successful Request (200 OK) + +When rate limit is not exceeded, requests proceed normally: + +```http +HTTP/1.1 200 OK +Content-Type: application/json + +{ + "execution_id": "abc123...", + "status": "queued", + ... +} +``` + +### Rate Limit Exceeded (429 Too Many Requests) + +When rate limit is exceeded, API returns 429 with a standardized error response: + +```http +HTTP/1.1 429 Too Many Requests +Content-Type: application/json + +{ + "type": "client_error", + "errors": [ + { + "code": "error", + "detail": "Organization has reached the maximum concurrent API requests limit (20/20). Please try again later.", + "attr": null + } + ] +} +``` + +**Response Format**: +The error response follows the `drf-standardized-errors` format used throughout the application: +- `type`: Error type (always `"client_error"` for 429) +- `errors`: Array of error objects + - `code`: Error code (always `"error"`) + - `detail`: Human-readable error message with current usage and limit + - `attr`: Field name (null for non-field errors) + +**Note**: Clients should implement their own retry logic with exponential backoff. The rate limit will be released once active requests complete. + +## Best Practices + +### 1. Set Custom Limits for Known High-Volume Customers + +```bash +# Identify high-volume customers +python manage.py list_org_rate_limits --with-usage + +# Set custom limits proactively +python manage.py set_org_rate_limit high-volume-customer 100 +``` + +### 2. Monitor Global Limit Usage + +If global limit is frequently hit, consider increasing it: + +```bash +# Check current global usage +python manage.py get_org_rate_limit any-org-id # Shows global usage + +# If consistently high, increase global limit +export API_DEPLOYMENT_GLOBAL_RATE_LIMIT=200 +# Restart application +``` + +### 3. Use TTL Refresh for Efficiency + +The cache TTL refresh (10 minutes, extended on read) provides optimal performance: +- Active orgs stay cached indefinitely (no DB queries) +- Inactive orgs expire after 10 minutes +- No manual cache management needed + +### 4. Clear Cache After ENV Changes + +After changing `API_DEPLOYMENT_DEFAULT_RATE_LIMIT`: + +```bash +# Clear all caches to pick up new default immediately +python manage.py clear_org_rate_limit_cache --all +``` + +### 5. Set Appropriate Global Limit + +The global limit should be: +- **Higher than sum of expected concurrent per-org usage** +- **Lower than system capacity** (to prevent resource exhaustion) + +Example calculation: +- 10 organizations +- Average 20 concurrent requests per org +- Expected total: 200 concurrent requests +- Recommended global limit: 250-300 (buffer for spikes) + +## Troubleshooting + +### Issue: Rate limit hit but usage seems low + +**Check**: +1. Redis cleanup might not have run yet (6-hour TTL) +2. Check actual Redis data: + ```bash + redis-cli + > ZCARD api_deployment:rate_limit:org:org_a1b2c3d4e5f6g7h8 + > ZRANGE api_deployment:rate_limit:org:org_a1b2c3d4e5f6g7h8 0 -1 WITHSCORES + ``` +3. Look for stuck executions (never completed/failed) + +**Solution**: +```bash +# Manually cleanup (run ZREMRANGEBYSCORE with old timestamp) +# Or wait for automatic cleanup (next request) +``` + +### Issue: Custom limit not taking effect + +**Check**: +1. Verify limit in database: + ```bash + python manage.py get_org_rate_limit acme-corp + ``` +2. Check if cache was cleared: + ```bash + python manage.py get_org_rate_limit acme-corp --clear-cache + ``` + +**Solution**: +```bash +# Clear cache and verify +python manage.py clear_org_rate_limit_cache --org-id acme-corp +python manage.py get_org_rate_limit acme-corp +``` + +### Issue: Default limit change not picked up + +**Cause**: Cache still holds old default for orgs without custom limits + +**Solution**: +```bash +# Clear ALL organization caches +python manage.py clear_org_rate_limit_cache --all +``` + +### Issue: Redis connection errors + +**Behavior**: System fails open - requests are allowed + +**Check**: +1. Redis connectivity: `redis-cli ping` +2. Application logs for connection errors + +**Solution**: +1. Fix Redis connection +2. No action needed for rate limiting - it's working as designed (fail-open) + +### Issue: Lock acquisition failures + +**Symptoms**: Logs show "Failed to acquire rate limit lock" + +**Cause**: High contention (many concurrent requests from same org) + +**Solution**: +1. Increase `API_DEPLOYMENT_RATE_LIMIT_LOCK_BLOCKING_TIMEOUT` (default: 5s) +2. Or increase org limit if legitimate traffic + +### Issue: Performance concerns with cache + +**Expected behavior**: +- Cache hit: <1ms (no DB query) +- Cache miss: 10-20ms (DB query + cache set) +- ~95% cache hit rate for active organizations + +**Monitor**: +```bash +# Check cache behavior +python manage.py get_org_rate_limit acme-corp # Should show "Cached Limit: X ✓" +``` + +## Database Schema + +### OrganizationRateLimit Model + +```python +class OrganizationRateLimit(BaseModel): + id = UUIDField(primary_key=True) + organization = ForeignKey(Organization) + concurrent_request_limit = IntegerField(default=5) + created_at = DateTimeField(auto_now_add=True) + modified_at = DateTimeField(auto_now=True) +``` + +**Migration**: `backend/api_v2/migrations/0003_add_organization_rate_limit.py` + +## Performance Characteristics + +### Latency Impact + +Per API request: +- **Cache hit** (most common): +1-2ms +- **Cache miss**: +10-20ms (includes DB query) +- **Lock acquisition**: +5-10ms (Redis roundtrip) +- **Total overhead**: ~15-30ms per request + +### Scalability + +- **Per-org locks**: No contention between organizations +- **Global limit check**: No lock (eventual consistency acceptable) +- **Redis ZSET operations**: O(log N) where N = active executions +- **Cache**: Reduces DB load by ~95% + +### Resource Usage + +- **Redis memory per execution**: ~100 bytes (ZSET entry + cache entry) +- **Redis memory for 1000 concurrent executions**: ~100 KB +- **Database**: 1 row per organization with custom limit (typically < 100 rows) + +## Security Considerations + +### Denial of Service (DoS) Prevention + +The rate limiting system itself is designed to prevent DoS: +- Fail-open strategy prevents rate limiting infrastructure from becoming attack vector +- Per-org isolation prevents one org from affecting others +- Global limit prevents system-wide resource exhaustion + +### Bypass Attempts + +Rate limiting cannot be bypassed because: +- Enforced at application layer (before authentication/authorization) +- Uses Redis distributed locks (atomic operations) +- Cache invalidation is automatic (can't bypass by manipulating cache) + +### Monitoring and Alerting + +Recommended monitoring: +1. **Global limit usage**: Alert if >80% for extended periods +2. **Per-org limit usage**: Alert if >90% for important customers +3. **429 error rate**: Alert if spike occurs +4. **Redis connectivity**: Alert on connection failures + +## References + +### Code Locations + +- **Rate limiter**: `backend/api_v2/rate_limiter.py` +- **Constants**: `backend/api_v2/rate_limit_constants.py` +- **Model**: `backend/api_v2/models.py` (`OrganizationRateLimit`) +- **View layer (entry point)**: `backend/api_v2/api_deployment_views.py` (`DeploymentExecution.post`) +- **Helper layer**: `backend/api_v2/deployment_helper.py` (`execute_workflow`) +- **Auto-release**: `backend/workflow_manager/workflow_v2/models/execution.py` +- **Management commands**: `backend/api_v2/management/commands/` + +### Configuration Files + +- **Environment variables**: `backend/sample.env` +- **Django settings**: `backend/backend/settings/base.py` +- **Migration**: `backend/api_v2/migrations/0003_add_organization_rate_limit.py` + +### Related Documentation + +- Redis ZSET documentation: https://redis.io/docs/data-types/sorted-sets/ +- Django cache framework: https://docs.djangoproject.com/en/stable/topics/cache/ +- HTTP 429 status code: https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/429