Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -566,25 +566,32 @@ async def get_allowed_mcp_servers(
)
)

key_access_group_extras = (
await MCPRequestHandler._get_key_access_group_mcp_server_extras(
user_api_key_auth
)
)

#########################################################
# Calculate key/team allowed servers using inheritance and intersection logic
#########################################################
allowed_mcp_servers: List[str] = []
has_lower_level_mcp_restrictions = (
len(allowed_mcp_servers_for_key) > 0
or len(allowed_mcp_servers_for_team) > 0
)
if len(allowed_mcp_servers_for_team) > 0:
if len(allowed_mcp_servers_for_key) > 0:
# Key has its own MCP permissions - use intersection with team permissions
for _mcp_server in allowed_mcp_servers_for_key:
if _mcp_server in allowed_mcp_servers_for_team:
allowed_mcp_servers.append(_mcp_server)
else:
# Key has no MCP permissions - inherit from team
allowed_mcp_servers = allowed_mcp_servers_for_team
key_set = set(allowed_mcp_servers_for_key)
team_set = set(allowed_mcp_servers_for_team)
extras_set = set(key_access_group_extras)

has_lower_level_mcp_restrictions = bool(key_set or team_set or extras_set)

# 1. Team-gated base scope.
if not team_set:
base = key_set # no team restriction
elif not key_set:
base = team_set # key has no own perms → inherits team
else:
allowed_mcp_servers = allowed_mcp_servers_for_key
base = key_set & team_set # both restrict → intersect

# 2. Extend with access-group extras (LIT-3189 — bypasses team
# ceiling, gated by group's assigned_team_ids / assigned_key_ids).
allowed_mcp_servers: List[str] = list(base | extras_set)

#########################################################
# Check end_user permissions if end_user_id is set
Expand Down Expand Up @@ -877,6 +884,43 @@ def is_tool_allowed(
return True
return False

@staticmethod
async def _get_key_access_group_mcp_server_extras(
user_api_key_auth: Optional[UserAPIKeyAuth] = None,
) -> List[str]:
"""
Resolve the key's unified `access_group_ids` (LiteLLM_AccessGroupTable) to
MCP server IDs, gated by the access group's `assigned_team_ids` /
`assigned_key_ids`. These servers extend the team's MCP scope rather
than being capped by it. Tag-style `mcp_access_groups` (per-server tags)
are intentionally not handled here — they have no assignment fields and
remain subject to the team ceiling.
"""
if user_api_key_auth is None:
return []
try:
from litellm.proxy._experimental.mcp_server.mcp_server_manager import (
global_mcp_server_manager,
)
from litellm.proxy.auth.auth_checks import (
get_authorized_resources_from_key_access_groups,
)

raw_server_ids = await get_authorized_resources_from_key_access_groups(
valid_token=user_api_key_auth,
team_object=None,
resource_field="access_mcp_server_ids",
)
if not raw_server_ids:
return []
# Permission entries may be server_ids OR names/aliases — expand to ids.
return global_mcp_server_manager.expand_permission_list(raw_server_ids)
except Exception as e:
verbose_logger.warning(
f"Failed to get key access group MCP server extras: {str(e)}"
)
return []

@staticmethod
async def _get_allowed_mcp_servers_for_key(
user_api_key_auth: Optional[UserAPIKeyAuth] = None,
Expand Down
61 changes: 38 additions & 23 deletions litellm/proxy/auth/auth_checks.py
Original file line number Diff line number Diff line change
Expand Up @@ -3163,44 +3163,40 @@ async def can_team_access_model(
raise


async def _key_access_group_grants_model(
model: Union[str, List[str]],
async def get_authorized_resources_from_key_access_groups(
valid_token: Optional[UserAPIKeyAuth],
team_object: Optional[LiteLLM_TeamTable],
llm_router: Optional[Router],
) -> bool:
resource_field: Literal[
"access_model_names", "access_mcp_server_ids", "access_agent_ids"
],
) -> List[str]:
"""
Returns True if the key's `access_group_ids` expand to models that grant
access to `model`. Used to let a key's access group override a team's
model restriction in `common_checks`.

A key's access group only counts if the access group itself authorizes the
caller as an owner — that is, the group's `assigned_team_ids` includes the
key's `team_id`, or the group's `assigned_key_ids` includes the key's
token. This preserves the team-as-owner boundary (a team member cannot
escalate by naming a group assigned to a different team) while still
letting a group reach the key without first being added to the team's
`access_group_ids` list.
For each access_group_id on the key, fetch the LiteLLM_AccessGroupTable row
and contribute its `resource_field` only if the group authorizes the caller
as an owner — that is, the group's `assigned_team_ids` includes the key's
`team_id`, or the group's `assigned_key_ids` includes the key's token. This
preserves the team-as-owner boundary while still letting a group reach the
key without first being added to the team's `access_group_ids` list.
"""
if valid_token is None:
return False
return []
key_access_group_ids = list(valid_token.access_group_ids or [])
if not key_access_group_ids:
return False
return []

from litellm.proxy.proxy_server import prisma_client as _prisma_client
from litellm.proxy.proxy_server import proxy_logging_obj as _proxy_logging_obj
from litellm.proxy.proxy_server import user_api_key_cache as _user_api_key_cache

if _prisma_client is None or _user_api_key_cache is None:
return False
return []

key_team_id = valid_token.team_id or (
team_object.team_id if team_object is not None else None
)
key_token = valid_token.token

authorized_models: List[str] = []
authorized_resources: List[str] = []
for ag_id in key_access_group_ids:
try:
ag = await get_access_object(
Expand All @@ -3216,17 +3212,36 @@ async def _key_access_group_grants_model(
)
key_authorized = bool(key_token and key_token in (ag.assigned_key_ids or []))
if team_authorized or key_authorized:
authorized_models.extend(ag.access_model_names or [])
authorized_resources.extend(getattr(ag, resource_field, []) or [])

return list(set(authorized_resources))


async def _key_access_group_grants_model(
model: Union[str, List[str]],
valid_token: Optional[UserAPIKeyAuth],
team_object: Optional[LiteLLM_TeamTable],
llm_router: Optional[Router],
) -> bool:
"""
Returns True if the key's `access_group_ids` expand to models that grant
access to `model`. Used to let a key's access group override a team's
model restriction in `common_checks`.
"""
authorized_models = await get_authorized_resources_from_key_access_groups(
valid_token=valid_token,
team_object=team_object,
resource_field="access_model_names",
)
if not authorized_models:
return False
try:
_can_object_call_model(
model=model,
llm_router=llm_router,
models=list(set(authorized_models)),
team_model_aliases=valid_token.team_model_aliases,
team_id=valid_token.team_id,
models=authorized_models,
team_model_aliases=valid_token.team_model_aliases if valid_token else None,
team_id=valid_token.team_id if valid_token else None,
object_type="key",
)
return True
Expand Down
Loading
Loading