Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 33 additions & 1 deletion backend/api_v2/admin.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,37 @@
from django.contrib import admin

from .models import APIDeployment, APIKey
from .models import APIDeployment, APIKey, OrganizationRateLimit


@admin.register(OrganizationRateLimit)
Comment thread
chandrasekharan-zipstack marked this conversation as resolved.
class OrganizationRateLimitAdmin(admin.ModelAdmin):
list_display = [
"organization",
"concurrent_request_limit",
"created_at",
"modified_at",
]
list_filter = ["created_at", "modified_at"]
search_fields = ["organization__name", "organization__organization_id"]
readonly_fields = ["created_at", "modified_at"]
fieldsets = (
(
None,
{
"fields": (
"organization",
"concurrent_request_limit",
)
},
),
(
"Timestamps",
{
"fields": ("created_at", "modified_at"),
"classes": ("collapse",),
},
),
)


admin.site.register([APIDeployment, APIKey])
60 changes: 45 additions & 15 deletions backend/api_v2/api_deployment_views.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import json
import logging
import uuid
from typing import Any

from configuration.models import Configuration
Expand All @@ -20,8 +21,9 @@
from api_v2.constants import ApiExecution
from api_v2.deployment_helper import DeploymentHelper
from api_v2.dto import DeploymentExecutionDTO
from api_v2.exceptions import NoActiveAPIKeyError
from api_v2.exceptions import NoActiveAPIKeyError, RateLimitExceeded
from api_v2.models import APIDeployment
from api_v2.rate_limiter import APIDeploymentRateLimiter
from api_v2.serializers import (
APIDeploymentListSerializer,
APIDeploymentSerializer,
Expand Down Expand Up @@ -67,6 +69,7 @@ def post(
) -> Response:
api: APIDeployment = deployment_execution_dto.api
api_key: str = deployment_execution_dto.api_key
organization = api.organization

serializer = ExecutionRequestSerializer(
data=request.data, context={"api": api, "api_key": api_key}
Expand All @@ -87,21 +90,48 @@ def post(
if presigned_urls:
DeploymentHelper.load_presigned_files(presigned_urls, file_objs)

response = DeploymentHelper.execute_workflow(
organization_name=org_name,
api=api,
file_objs=file_objs,
timeout=timeout,
include_metadata=include_metadata,
include_metrics=include_metrics,
use_file_history=use_file_history,
tag_names=tag_names,
llm_profile_id=llm_profile_id,
hitl_queue_name=hitl_queue_name,
hitl_packet_id=hitl_packet_id,
custom_data=custom_data,
request_headers=dict(request.headers),
# Generate execution ID for rate limiting
execution_id = str(uuid.uuid4())

# Check and acquire rate limit slot
can_proceed, limit_info = APIDeploymentRateLimiter.check_and_acquire(
organization, execution_id
)
if not can_proceed:
logger.warning(
f"Rate limit exceeded for org {organization.organization_id}: {limit_info}"
)
raise RateLimitExceeded(
current_usage=limit_info["current_usage"],
limit=limit_info["limit"],
limit_type=limit_info["limit_type"],
)

# Execute workflow with blanket exception handling
try:
response = DeploymentHelper.execute_workflow(
organization_name=org_name,
api=api,
file_objs=file_objs,
timeout=timeout,
include_metadata=include_metadata,
include_metrics=include_metrics,
use_file_history=use_file_history,
tag_names=tag_names,
llm_profile_id=llm_profile_id,
hitl_queue_name=hitl_queue_name,
hitl_packet_id=hitl_packet_id,
custom_data=custom_data,
request_headers=dict(request.headers),
execution_id=execution_id,
)
except Exception as error:
# Release slot on any failure during workflow setup/execution
APIDeploymentRateLimiter.release_slot(organization, execution_id)
logger.exception(f"Workflow execution failed: {error}")
raise
Comment thread
coderabbitai[bot] marked this conversation as resolved.

# Success - signal will handle slot release when workflow completes
if "error" in response and response["error"]:
logger.error("API deployment execution failed")
return Response(
Expand Down
18 changes: 18 additions & 0 deletions backend/api_v2/deployment_helper.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import logging
import uuid
from io import BytesIO
from typing import Any
from urllib.parse import urlencode, urlparse
Expand Down Expand Up @@ -33,6 +34,7 @@
)
from api_v2.key_helper import KeyHelper
from api_v2.models import APIDeployment, APIKey
from api_v2.rate_limiter import APIDeploymentRateLimiter
from api_v2.serializers import APIExecutionResponseSerializer
from api_v2.utils import APIDeploymentUtils

Expand Down Expand Up @@ -158,6 +160,7 @@ def execute_workflow(
hitl_packet_id: str | None = None,
custom_data: dict[str, Any] | None = None,
request_headers=None,
execution_id: str | None = None,
) -> ReturnDict:
"""Execute workflow by api.

Expand All @@ -172,10 +175,20 @@ def execute_workflow(
hitl_queue_name (str, optional): Custom queue name for manual review
hitl_packet_id (str, optional): Packet ID for packet-based review
custom_data (dict[str, Any], optional): JSON data for custom_data variable replacement in prompts
execution_id (str, optional): Pre-generated execution ID for rate limiting.
If None, a new UUID will be generated.

Returns:
ReturnDict: execution status/ result

Note:
Rate limiting is handled at the view layer. This method should be called
after rate limit checks have passed, with a pre-acquired execution_id.
"""
# Use provided execution_id or generate one (for backward compatibility)
if execution_id is None:
execution_id = str(uuid.uuid4())

workflow_id = api.workflow.id
pipeline_id = api.id
if hitl_queue_name:
Expand All @@ -186,6 +199,7 @@ def execute_workflow(
workflow_execution = WorkflowExecutionServiceHelper.create_workflow_execution(
workflow_id=workflow_id,
pipeline_id=pipeline_id,
execution_id=execution_id,
mode=WorkflowExecution.Mode.QUEUE,
tags=tags,
total_files=len(file_objs),
Expand Down Expand Up @@ -264,6 +278,10 @@ def execute_workflow(
if not include_metrics:
result.remove_result_metrics()
except Exception as error:
# Release rate limit slot (workflow setup/dispatch failed, async job not started)
APIDeploymentRateLimiter.release_slot(api.organization, str(execution_id))

# Clean up storage
DestinationConnector.delete_api_storage_dir(
workflow_id=workflow_id, execution_id=execution_id
)
Expand Down
31 changes: 31 additions & 0 deletions backend/api_v2/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,37 @@ def __init__(
super().__init__(detail, code)


class RateLimitExceeded(APIException):
status_code = 429
default_detail = "Rate limit exceeded"

def __init__(
self,
current_usage: int = 0,
limit: int = 0,
limit_type: str = "organization",
detail: str | None = None,
code: str | None = None,
):
from api_v2.rate_limit_constants import RateLimitMessages

self.current_usage = current_usage
self.limit = limit
self.limit_type = limit_type

if detail is None:
if limit_type == "organization":
detail = RateLimitMessages.get_org_limit_exceeded_message(
current_usage=current_usage, limit=limit
)
else:
detail = RateLimitMessages.get_global_limit_exceeded_message(
current_usage=current_usage, limit=limit
)
Comment thread
chandrasekharan-zipstack marked this conversation as resolved.

super().__init__(detail, code)


class PresignedURLFetchError(APIException):
default_detail = "Failed to fetch file from presigned URL"

Expand Down
Empty file.
Empty file.
158 changes: 158 additions & 0 deletions backend/api_v2/management/commands/clear_org_rate_limit_cache.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
from account_v2.models import Organization
from django.core.cache import cache
from django.core.management.base import BaseCommand

from api_v2.models import OrganizationRateLimit
from api_v2.rate_limit_constants import RateLimitKeys


class Command(BaseCommand):
help = (
"Clear rate limit cache for organizations (useful after changing default limit)"
)

def add_arguments(self, parser):
parser.add_argument(
"--org-id",
type=str,
help="Clear cache for specific organization ID or name (default: all orgs)",
)
parser.add_argument(
"--all",
action="store_true",
help="Clear cache for ALL organizations (with or without custom limits)",
)

def handle(self, *args, **options):
org_id = options.get("org_id")
clear_all = options["all"]

if org_id:
# Clear cache for specific organization
self._clear_org_cache(org_id)
elif clear_all:
# Clear cache for ALL organizations
self._clear_all_orgs_cache()
else:
# Clear cache for organizations with custom limits
self._clear_custom_limits_cache()

def _clear_org_cache(self, org_id: str):
"""Clear cache for a specific organization."""
# Get organization
try:
organization = Organization.objects.get(organization_id=org_id)
except Organization.DoesNotExist:
try:
organization = Organization.objects.get(name=org_id)
except Organization.DoesNotExist:
self.stdout.write(self.style.ERROR(f'Organization "{org_id}" not found'))
return

cache_key = RateLimitKeys.get_org_limit_cache_key(
str(organization.organization_id)
)
cache.delete(cache_key)

self.stdout.write(
self.style.SUCCESS(
f"✓ Cleared cache for {organization.name} ({organization.organization_id})"
)
)

def _clear_custom_limits_cache(self):
"""Clear cache for organizations with custom rate limits."""
org_limits = OrganizationRateLimit.objects.select_related("organization").all()

if not org_limits:
self.stdout.write(
self.style.WARNING("No custom rate limits found - nothing to clear")
)
return

count = 0
for org_limit in org_limits:
org_id = str(org_limit.organization.organization_id)
cache_key = RateLimitKeys.get_org_limit_cache_key(org_id)
cache.delete(cache_key)
count += 1

self.stdout.write(
self.style.SUCCESS(
f"✓ Cleared cache for {count} organizations with custom limits"
)
)

def _clear_all_orgs_cache(self):
"""Clear cache for ALL organizations (with or without custom limits)."""
self.stdout.write(
self.style.WARNING(
"Clearing cache for ALL organizations (including those using defaults)..."
)
)

# Try pattern-based deletion first (works with Redis cache backend)
if self._try_pattern_delete():
self.stdout.write(
self.style.SUCCESS(
"✓ Cleared all organization rate limit caches using pattern deletion"
)
)
else:
# Fallback: iterate through all organizations
self._clear_all_orgs_individually()

self.stdout.write(
self.style.WARNING(
"Note: Cache will be repopulated on next API request for each org"
)
)

def _try_pattern_delete(self) -> bool:
"""Try to delete cache keys using pattern (Redis-specific).

Returns:
True if pattern deletion succeeded, False if not supported
"""
try:
# Check if cache backend supports delete_pattern (Redis cache)
if hasattr(cache, "delete_pattern"):
pattern = RateLimitKeys.ORG_LIMIT_CACHE_KEY_PATTERN.replace(
"{org_id}", "*"
)
deleted_count = cache.delete_pattern(pattern)
self.stdout.write(f"Deleted {deleted_count} cache keys using pattern")
return True
return False
except Exception as e:
self.stdout.write(self.style.WARNING(f"Pattern deletion failed: {e}"))
return False

def _clear_all_orgs_individually(self):
"""Fallback: Clear cache by iterating through all organizations."""
organizations = Organization.objects.all()
count = organizations.count()

if count == 0:
self.stdout.write(self.style.WARNING("No organizations found"))
return

# Confirm for large number of orgs
if count > 50:
self.stdout.write(
f"This will clear cache for {count} organizations individually."
)
confirm = input("Continue? [y/N]: ")
if confirm.lower() != "y":
self.stdout.write(self.style.WARNING("Cancelled"))
return

cleared = 0
for org in organizations:
cache_key = RateLimitKeys.get_org_limit_cache_key(str(org.organization_id))
cache.delete(cache_key)
cleared += 1

self.stdout.write(
self.style.SUCCESS(f"✓ Cleared cache for {cleared} organizations")
)
Loading