UN-2470 [FEAT] Remove Django dependency from Celery workers with internal APIs#1494
Conversation
This commit introduces a new worker architecture that decouples Celery workers from Django where possible, enabling support for gevent/eventlet pool types and reducing worker startup overhead. Key changes: - Created separate worker modules (api-deployment, callback, file_processing, general) - Added internal API endpoints for worker communication - Implemented Django-free task execution where appropriate - Added shared utilities and client facades - Updated container configurations for new worker architecture 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
Setup the docker for new workers - Add executable permissions to worker entrypoint files - Fix import order in namespace package __init__.py - Remove unused variable api_status in general worker - Address ruff E402 and F841 errors 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
…-MISC-remove-django-dependency-from-celery-workers
|
Caution Review failedThe pull request is closed. Summary by CodeRabbit
WalkthroughAdds a broad internal service surface and auth for worker-to-backend communication, centralized internal API constants, organization utilities, many worker-facing internal endpoints/serializers/URL modules, shared core datamodels and file operations for backend↔worker contracts, connector and plugin loader improvements, cache/log utilities, unified worker image and compose wiring, and new worker task implementations. Changes
Sequence Diagram(s)sequenceDiagram
autonumber
participant Worker as Worker (internal client)
participant Middleware as InternalAPIAuthMiddleware
participant InternalAPI as Django internal endpoints
participant OrgUtil as organization_utils
participant DB as Database / Services
Worker->>Middleware: Request to /internal/... (Authorization: Bearer, X-Organization-ID)
Middleware->>Middleware: validate Authorization vs INTERNAL_SERVICE_API_KEY
alt auth ok
Middleware->>OrgUtil: resolve_organization(X-Organization-ID)
OrgUtil-->>Middleware: Organization | None
Middleware-->>InternalAPI: annotate request with organization context and authenticated_via
InternalAPI->>DB: perform organization-scoped query or enqueue work
DB-->>InternalAPI: data/result
InternalAPI-->>Worker: JSON response (200/202/404/500)
Middleware->>Middleware: process_response clears org context
else auth fail
Middleware-->>Worker: 401 Unauthorized (JSON error)
end
Estimated code review effort🎯 5 (Critical) | ⏱️ ~120 minutes Pre-merge checks and finishing touches✅ Passed checks (3 passed)
📜 Recent review detailsConfiguration used: CodeRabbit UI Review profile: CHILL Plan: Pro Cache: Disabled due to Reviews > Disable Cache setting Knowledge base: Disabled due to 📒 Files selected for processing (4)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
- Fix flawed singleton implementation that was trying to share BaseAPIClient instances - Now properly shares HTTP sessions between specialized clients - Eliminates 6x BaseAPIClient initialization by reusing the same underlying session - Should reduce API deployment orchestration time by ~135ms (from 6 clients to 1 session) - Added debug logging to verify singleton pattern activation
There was a problem hiding this comment.
Actionable comments posted: 1
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
Cache: Disabled due to Reviews > Disable Cache setting
Knowledge base: Disabled due to Reviews -> Disable Knowledge Base setting
📒 Files selected for processing (1)
runner/src/unstract/runner/controller/health.py(2 hunks)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: build
| @health_bp.route("/health", methods=["GET"]) | ||
| def health_check() -> str: | ||
| return "OK" |
There was a problem hiding this comment.
Restore backward-compatible /ping endpoint.
Switching the health check route from /ping to /health with no alias will break any existing readiness/liveness probes or integrations still hitting /ping. Unless every deployment and worker config is being updated in lockstep, this is a high-risk change. Please keep /ping alive (even as a simple alias) until consumers are migrated.
@health_bp.route("/health", methods=["GET"])
def health_check() -> str:
return "OK"
+
+@health_bp.route("/ping", methods=["GET"])
+def ping() -> str:
+ return "OK"📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| @health_bp.route("/health", methods=["GET"]) | |
| def health_check() -> str: | |
| return "OK" | |
| @health_bp.route("/health", methods=["GET"]) | |
| def health_check() -> str: | |
| return "OK" | |
| @health_bp.route("/ping", methods=["GET"]) | |
| def ping() -> str: | |
| return "OK" |
🤖 Prompt for AI Agents
In runner/src/unstract/runner/controller/health.py around lines 12 to 14,
restore backward-compatible /ping by adding /ping as an alias for the existing
health_check route so both /health and /ping return the same "OK" response;
modify the route decorator to include methods=["GET"] for both paths or register
an additional route for "/ping" pointing to health_check, ensuring no behavior
change and keeping the single health_check function as the implementation.
…workers' of github.com:Zipstack/unstract into feat/UN-2470-MISC-remove-django-dependency-from-celery-workers
…rom worker for ws
There was a problem hiding this comment.
Actionable comments posted: 4
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
backend/workflow_manager/workflow_v2/execution_log_utils.py (1)
51-55: Log parsing failures silently drop queue items.
LogDataDTO.from_jsonreturnsNoneon parse failure. In that case we already consumed the entry from Redis but neither incrementskipped_countnor log anything, so the log just evaporates. Please log and count these cases (ideally push them to the skipped tally) so operators can detect and investigate corrupt payloads instead of losing them silently.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
Cache: Disabled due to Reviews > Disable Cache setting
Knowledge base: Disabled due to Reviews -> Disable Knowledge Base setting
⛔ Files ignored due to path filters (1)
uv.lockis excluded by!**/*.lock
📒 Files selected for processing (9)
.gitignore(3 hunks)backend/utils/cache_service.py(4 hunks)backend/workflow_manager/workflow_v2/execution_log_internal_urls.py(1 hunks)backend/workflow_manager/workflow_v2/execution_log_internal_views.py(1 hunks)backend/workflow_manager/workflow_v2/execution_log_utils.py(4 hunks)backend/workflow_manager/workflow_v2/workflow_helper.py(5 hunks)docker/docker-compose.yaml(1 hunks)unstract/core/src/unstract/core/log_utils.py(1 hunks)workers/callback/tasks.py(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (3)
- backend/workflow_manager/workflow_v2/execution_log_internal_urls.py
- .gitignore
- backend/workflow_manager/workflow_v2/workflow_helper.py
🧰 Additional context used
🪛 Ruff (0.13.1)
unstract/core/src/unstract/core/log_utils.py
43-43: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
117-117: Do not catch blind exception: Exception
(BLE001)
118-118: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
backend/workflow_manager/workflow_v2/execution_log_internal_views.py
152-152: Unused method argument: request
(ARG002)
backend/utils/cache_service.py
100-100: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
100-100: Use explicit conversion flag
Replace with conversion flag
(RUF010)
106-108: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
107-107: Use explicit conversion flag
Replace with conversion flag
(RUF010)
109-109: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling
(B904)
workers/callback/tasks.py
104-104: Do not catch blind exception: Exception
(BLE001)
190-190: Consider moving this statement to an else block
(TRY300)
192-192: Do not catch blind exception: Exception
(BLE001)
199-199: Unused function argument: execution_id
(ARG001)
232-232: Consider moving this statement to an else block
(TRY300)
321-321: Do not catch blind exception: Exception
(BLE001)
418-418: Consider moving this statement to an else block
(TRY300)
543-543: Do not catch blind exception: Exception
(BLE001)
556-556: Use explicit conversion flag
Replace with conversion flag
(RUF010)
571-571: Unused function argument: workflow_id
(ARG001)
619-623: Consider moving this statement to an else block
(TRY300)
625-625: Do not catch blind exception: Exception
(BLE001)
635-635: Unused function argument: results
(ARG001)
658-658: Avoid specifying long messages outside the exception class
(TRY003)
676-678: Abstract raise to an inner function
(TRY301)
676-678: Avoid specifying long messages outside the exception class
(TRY003)
710-712: Avoid specifying long messages outside the exception class
(TRY003)
728-730: Avoid specifying long messages outside the exception class
(TRY003)
793-793: Do not catch blind exception: Exception
(BLE001)
794-794: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
795-795: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling
(B904)
795-795: Avoid specifying long messages outside the exception class
(TRY003)
799-799: Avoid specifying long messages outside the exception class
(TRY003)
828-828: Consider moving this statement to an else block
(TRY300)
829-829: Do not catch blind exception: Exception
(BLE001)
854-854: Do not catch blind exception: Exception
(BLE001)
947-947: Do not catch blind exception: Exception
(BLE001)
948-950: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
967-967: Do not catch blind exception: Exception
(BLE001)
968-968: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
1006-1006: Do not catch blind exception: Exception
(BLE001)
1023-1023: Do not catch blind exception: Exception
(BLE001)
1041-1041: Do not catch blind exception: Exception
(BLE001)
1045-1048: Avoid specifying long messages outside the exception class
(TRY003)
1051-1051: Avoid specifying long messages outside the exception class
(TRY003)
1104-1104: Do not catch blind exception: Exception
(BLE001)
1105-1107: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
1144-1144: Do not catch blind exception: Exception
(BLE001)
1145-1145: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
1147-1147: Use explicit conversion flag
Replace with conversion flag
(RUF010)
1175-1175: String contains ambiguous ℹ (INFORMATION SOURCE). Did you mean i (LATIN SMALL LETTER I)?
(RUF001)
1182-1182: Do not catch blind exception: Exception
(BLE001)
1224-1224: Unused function argument: args
(ARG001)
1251-1251: Avoid specifying long messages outside the exception class
(TRY003)
1335-1335: Do not catch blind exception: Exception
(BLE001)
1342-1342: Consider moving this statement to an else block
(TRY300)
1345-1347: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
1362-1362: Do not catch blind exception: Exception
(BLE001)
1363-1363: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
1418-1418: Unused function argument: args
(ARG001)
1441-1441: Avoid specifying long messages outside the exception class
(TRY003)
1450-1450: Avoid specifying long messages outside the exception class
(TRY003)
1457-1457: Create your own exception
(TRY002)
1457-1457: Avoid specifying long messages outside the exception class
(TRY003)
1489-1489: Abstract raise to an inner function
(TRY301)
1584-1584: Consider moving this statement to an else block
(TRY300)
1587-1589: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
1608-1608: Do not catch blind exception: Exception
(BLE001)
1609-1609: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
1617-1617: Unused function argument: execution_status
(ARG001)
1682-1682: Do not catch blind exception: Exception
(BLE001)
1683-1685: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
1684-1684: Use explicit conversion flag
Replace with conversion flag
(RUF010)
1691-1691: Unused function argument: execution_status
(ARG001)
1756-1756: Do not catch blind exception: Exception
(BLE001)
1757-1759: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
1758-1758: Use explicit conversion flag
Replace with conversion flag
(RUF010)
🔇 Additional comments (3)
docker/docker-compose.yaml (1)
367-368: Align GENERAL_METRICS_PORT with exposed container port
GENERAL_METRICS_PORTis set to 8081, but the container exposes 8082 (viaports: "8088:8082"andHEALTH_PORT=8082). Metrics will publish on the wrong port and become unreachable. Please make the metrics port consistent with the exposed container port.- - GENERAL_METRICS_PORT=8081 + - GENERAL_METRICS_PORT=8082workers/callback/tasks.py (2)
841-847: Stop shadowing the cache manager import.Line 842 re-imports
get_cache_managerfromshared.cache, overwriting the already-initialized instance fromshared.infrastructure.caching.cache_utils. In worker-only deployments that module is absent, so this raisesImportErrorand the execution cache never clears. Reuse the module-level import instead.- from shared.cache import get_cache_manager - cache_manager = get_cache_manager()
503-511: Use ISO-8601 timestamps forlast_run_time.Line 509 currently submits
time.time()toupdate_pipeline_status, but the receiving internal API/serializer expects an ISO-8601 string (the field is aDateTimeField). Sending a float causes validation to fail and the pipeline update call falls back to the failure path, so the pipeline never records its last run metadata. Switch to an aware UTC timestamp and importdatetime/timezoneat the top.+from datetime import datetime, timezone @@ - last_run_time=time.time(), + last_run_time=datetime.now(timezone.utc).isoformat(),
| except (ConnectionError, TimeoutError, OSError) as e: | ||
| logger.error(f"Failed to clear cache pattern '{key_pattern}': {str(e)}") | ||
| # Fallback to old method for backward compatibility | ||
| try: | ||
| cache.delete_pattern(key_pattern) | ||
| logger.warning(f"Used fallback delete_pattern for '{key_pattern}'") | ||
| except (ConnectionError, TimeoutError, OSError) as fallback_error: | ||
| logger.error( | ||
| f"Fallback cache clearing also failed: {str(fallback_error)}" | ||
| ) | ||
| raise e |
There was a problem hiding this comment.
Fix Redis-specific exceptions in the fallback path
redis_cache.scan() and the pipeline operations raise redis.exceptions.ConnectionError / redis.exceptions.TimeoutError, not the built-in classes currently captured. As a result the fallback never runs when Redis disconnects, defeating the resilience this block is supposed to provide. Please catch the redis-py exceptions (or their common base) and re-raise with raise to preserve the original traceback.
- except (ConnectionError, TimeoutError, OSError) as e:
+ except (RedisConnectionError, RedisTimeoutError, OSError) as e:
@@
- except (ConnectionError, TimeoutError, OSError) as fallback_error:
+ except (RedisConnectionError, RedisTimeoutError, OSError) as fallback_error:
@@
- raise e
+ raiseAdd near the imports:
from redis.exceptions import ConnectionError as RedisConnectionError, TimeoutError as RedisTimeoutError🧰 Tools
🪛 Ruff (0.13.1)
100-100: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
100-100: Use explicit conversion flag
Replace with conversion flag
(RUF010)
106-108: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
107-107: Use explicit conversion flag
Replace with conversion flag
(RUF010)
109-109: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling
(B904)
🤖 Prompt for AI Agents
In backend/utils/cache_service.py around lines 99 to 109, the except is catching
built-in ConnectionError/TimeoutError/OSError but redis-py raises
redis.exceptions.ConnectionError/TimeoutError, so the fallback path never
executes on Redis disconnects; update imports to bring in
redis.exceptions.ConnectionError and redis.exceptions.TimeoutError (aliased to
avoid name collision) and change the except clause to catch those Redis-specific
exceptions (or their common base) in addition to OSError; in the fallback except
re-raise using plain raise (not raise e) to preserve the original traceback.
| processed_count = 0 | ||
| for organization_id, logs in organization_logs.items(): | ||
| logger.info(f"Storing '{len(logs)}' logs for org: {organization_id}") | ||
| logger.info(f"Storing {len(logs)} logs for org: {organization_id}") | ||
| ExecutionLog.objects.bulk_create(objs=logs, ignore_conflicts=True) | ||
| processed_count += len(logs) | ||
|
|
||
| return { | ||
| "processed_count": processed_count, | ||
| "skipped_count": skipped_count, | ||
| "total_logs": logs_count, |
There was a problem hiding this comment.
Processed count can over-report when ignore_conflicts=True.
processed_count is incremented by len(logs) even when bulk_create(..., ignore_conflicts=True) silently skips rows due to conflicts. That inflates the reported processed total and hides deduplications. Either drop ignore_conflicts=True, or capture the actual inserted row count (e.g., by checking for primary keys on returned objects or deduplicating before insert) so the summary reflects reality.
🤖 Prompt for AI Agents
In backend/workflow_manager/workflow_v2/execution_log_utils.py around lines
112-121 the code increments processed_count by len(logs) while calling
ExecutionLog.objects.bulk_create(..., ignore_conflicts=True), which can
over-report when rows are skipped due to conflicts; instead deduplicate or
detect actual inserts before updating counters: compute a deduplicated list of
log objects (by the unique key(s) that would cause conflicts, e.g.,
(organization_id, external_id, timestamp) or the model's unique fields) and use
len(deduped_logs) when calling bulk_create and when incrementing
processed_count, or alternatively dedupe into a set of unique identifiers and
only bulk_create that set so the reported processed_count reflects the number of
attempted inserts that will not be ignored.
| return redis.Redis( | ||
| host=host, | ||
| port=port, | ||
| username=username, | ||
| password=password, | ||
| decode_responses=False, # Keep as bytes for consistency | ||
| **kwargs, | ||
| ) |
There was a problem hiding this comment.
Allow overriding decode_responses without raising TypeError.
We currently pass decode_responses=False explicitly and also expose **kwargs for additional config. If a caller provides decode_responses inside **kwargs (a very reasonable use case for callers needing strings instead of bytes), redis.Redis sees the argument twice and raises TypeError: got multiple values for keyword argument 'decode_responses', breaking worker startup. Please pull the value out of kwargs (defaulting to False) before invoking redis.Redis.
- return redis.Redis(
- host=host,
- port=port,
- username=username,
- password=password,
- decode_responses=False, # Keep as bytes for consistency
- **kwargs,
- )
+ decode_responses = kwargs.pop("decode_responses", False)
+ return redis.Redis(
+ host=host,
+ port=port,
+ username=username,
+ password=password,
+ decode_responses=decode_responses, # bytes by default; callers can override
+ **kwargs,
+ )📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| return redis.Redis( | |
| host=host, | |
| port=port, | |
| username=username, | |
| password=password, | |
| decode_responses=False, # Keep as bytes for consistency | |
| **kwargs, | |
| ) | |
| # Allow callers to override decode_responses without collision | |
| decode_responses = kwargs.pop("decode_responses", False) | |
| return redis.Redis( | |
| host=host, | |
| port=port, | |
| username=username, | |
| password=password, | |
| decode_responses=decode_responses, # bytes by default; callers can override | |
| **kwargs, | |
| ) |
🤖 Prompt for AI Agents
In unstract/core/src/unstract/core/log_utils.py around lines 140 to 147, the
function currently hardcodes decode_responses=False while also forwarding
**kwargs, which will raise TypeError if callers pass decode_responses in kwargs;
change it to pop decode_responses from kwargs with a default of False (e.g.
decode_responses = kwargs.pop("decode_responses", False)) and then pass that
variable to redis.Redis instead of the hardcoded literal, leaving the remaining
kwargs intact.
| task_time_limit=_get_callback_timeouts()[ | ||
| 0 | ||
| ], # Configurable hard timeout (default: 1 hour) | ||
| task_soft_time_limit=_get_callback_timeouts()[ | ||
| 1 | ||
| ], # Configurable soft timeout (default: 55 minutes) |
There was a problem hiding this comment.
Celery is ignoring the configured timeouts.
Celery’s task options are time_limit / soft_time_limit; task_time_limit keys are ignored. As written, neither callback task enforces the intended 1h/55m limits, so long-running executions never time out. Rename the options to the supported keys.
- task_time_limit=_get_callback_timeouts()[0],
- task_soft_time_limit=_get_callback_timeouts()[1],
+ time_limit=_get_callback_timeouts()[0],
+ soft_time_limit=_get_callback_timeouts()[1],(and the same change for process_batch_callback_api)
Also applies to: 1406-1411
🤖 Prompt for AI Agents
In workers/callback/tasks.py around lines 1374-1379 (and also lines 1406-1411),
the Celery task options use unsupported keys `task_time_limit` and
`task_soft_time_limit`, so configured timeouts are ignored; rename these keys to
Celery-supported `time_limit` and `soft_time_limit` respectively for both
callback task definitions, preserving the same values returned by
_get_callback_timeouts(), so the hard (1h) and soft (55m) limits are actually
enforced.
…workers' of github.com:Zipstack/unstract into feat/UN-2470-MISC-remove-django-dependency-from-celery-workers
There was a problem hiding this comment.
Actionable comments posted: 0
🧹 Nitpick comments (3)
workers/callback/tasks.py (3)
296-303: Ensure org context is set before querying execution detailsGuarantees the first execution fetch won’t 401/403 on org‑scoped APIs.
Apply:
expected_files = 0 try: + # Ensure org context is set before querying execution details + api_client.set_organization_context(organization_id) execution_response = api_client.get_workflow_execution(execution_id)
961-961: Use typing.Any (not built‑in any) in return typeFixes type hinting.
Apply:
-def _get_execution_directories(context: CallbackContext) -> list[tuple[str, any, str]]: +def _get_execution_directories(context: CallbackContext) -> list[tuple[str, Any, str]]:
590-593: Reflect actual status in notification logDon’t hardcode “execution completed”; include the real status.
Apply:
- logger.info( - f"Triggering notifications for target_id={pipeline_id} (execution completed)" - ) + logger.info( + f"Triggering notifications for target_id={pipeline_id} (status={status})" + )
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
Cache: Disabled due to Reviews > Disable Cache setting
Knowledge base: Disabled due to Reviews -> Disable Knowledge Base setting
📒 Files selected for processing (1)
workers/callback/tasks.py(1 hunks)
🧰 Additional context used
🪛 Ruff (0.13.1)
workers/callback/tasks.py
93-93: Do not catch blind exception: Exception
(BLE001)
179-179: Consider moving this statement to an else block
(TRY300)
181-181: Do not catch blind exception: Exception
(BLE001)
188-188: Unused function argument: execution_id
(ARG001)
221-221: Consider moving this statement to an else block
(TRY300)
310-310: Do not catch blind exception: Exception
(BLE001)
407-407: Consider moving this statement to an else block
(TRY300)
532-532: Do not catch blind exception: Exception
(BLE001)
545-545: Use explicit conversion flag
Replace with conversion flag
(RUF010)
560-560: Unused function argument: workflow_id
(ARG001)
608-612: Consider moving this statement to an else block
(TRY300)
614-614: Do not catch blind exception: Exception
(BLE001)
624-624: Unused function argument: results
(ARG001)
647-647: Avoid specifying long messages outside the exception class
(TRY003)
665-667: Abstract raise to an inner function
(TRY301)
665-667: Avoid specifying long messages outside the exception class
(TRY003)
699-701: Avoid specifying long messages outside the exception class
(TRY003)
717-719: Avoid specifying long messages outside the exception class
(TRY003)
782-782: Do not catch blind exception: Exception
(BLE001)
783-783: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
784-784: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling
(B904)
784-784: Avoid specifying long messages outside the exception class
(TRY003)
788-788: Avoid specifying long messages outside the exception class
(TRY003)
817-817: Consider moving this statement to an else block
(TRY300)
818-818: Do not catch blind exception: Exception
(BLE001)
843-843: Do not catch blind exception: Exception
(BLE001)
936-936: Do not catch blind exception: Exception
(BLE001)
937-939: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
956-956: Do not catch blind exception: Exception
(BLE001)
957-957: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
995-995: Do not catch blind exception: Exception
(BLE001)
1012-1012: Do not catch blind exception: Exception
(BLE001)
1030-1030: Do not catch blind exception: Exception
(BLE001)
1034-1037: Avoid specifying long messages outside the exception class
(TRY003)
1040-1040: Avoid specifying long messages outside the exception class
(TRY003)
1093-1093: Do not catch blind exception: Exception
(BLE001)
1094-1096: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
1133-1133: Do not catch blind exception: Exception
(BLE001)
1134-1134: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
1136-1136: Use explicit conversion flag
Replace with conversion flag
(RUF010)
1164-1164: String contains ambiguous ℹ (INFORMATION SOURCE). Did you mean i (LATIN SMALL LETTER I)?
(RUF001)
1171-1171: Do not catch blind exception: Exception
(BLE001)
1213-1213: Unused function argument: args
(ARG001)
1240-1240: Avoid specifying long messages outside the exception class
(TRY003)
1324-1324: Do not catch blind exception: Exception
(BLE001)
1331-1331: Consider moving this statement to an else block
(TRY300)
1334-1336: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
1351-1351: Do not catch blind exception: Exception
(BLE001)
1352-1352: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
1397-1397: Unused function argument: args
(ARG001)
1420-1420: Avoid specifying long messages outside the exception class
(TRY003)
1429-1429: Avoid specifying long messages outside the exception class
(TRY003)
1436-1436: Create your own exception
(TRY002)
1436-1436: Avoid specifying long messages outside the exception class
(TRY003)
1468-1468: Abstract raise to an inner function
(TRY301)
1563-1563: Consider moving this statement to an else block
(TRY300)
1566-1568: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
1587-1587: Do not catch blind exception: Exception
(BLE001)
1588-1588: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
1596-1596: Unused function argument: execution_status
(ARG001)
1661-1661: Do not catch blind exception: Exception
(BLE001)
1662-1664: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
1663-1663: Use explicit conversion flag
Replace with conversion flag
(RUF010)
1670-1670: Unused function argument: execution_status
(ARG001)
1735-1735: Do not catch blind exception: Exception
(BLE001)
1736-1738: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
1737-1737: Use explicit conversion flag
Replace with conversion flag
(RUF010)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: build
🔇 Additional comments (6)
workers/callback/tasks.py (6)
689-693: Drop in‑function WorkerConfig import; use the module‑level importAvoid redundant imports and keep a single source.
Apply:
- from shared.infrastructure.config import WorkerConfig - temp_config = WorkerConfig()
782-785: Log exceptions with traceback and chain the raiseUse logger.exception and raise from e for clearer debugging.
Apply:
- except Exception as e: - logger.error(f"Failed to fetch workflow execution context: {e}") - raise ValueError(f"Could not get execution context: {e}") + except Exception as e: + logger.exception("Failed to fetch workflow execution context") + raise ValueError("Could not get execution context") from e
252-354: Nice unification of status computation and timeout detectionConsolidating aggregation, wall‑clock time, and timeout heuristics reduces drift between callback paths.
116-135: Fix status mapping to be case‑safe and return canonical failure valueNormalize keys and default to PipelineStatus.FAILURE.value to avoid mislabeling statuses when enum values aren’t uppercase.
Apply:
- status_mapping = { - # ExecutionStatus enum values - ExecutionStatus.COMPLETED.value: PipelineStatus.SUCCESS.value, - ExecutionStatus.ERROR.value: PipelineStatus.FAILURE.value, - ExecutionStatus.EXECUTING.value: PipelineStatus.INPROGRESS.value, - ExecutionStatus.PENDING.value: PipelineStatus.YET_TO_START.value, - ExecutionStatus.STOPPED.value: PipelineStatus.FAILURE.value, - ExecutionStatus.QUEUED.value: PipelineStatus.INPROGRESS.value, - ExecutionStatus.CANCELED.value: PipelineStatus.FAILURE.value, - # Legacy status values for backward compatibility - "SUCCESS": PipelineStatus.SUCCESS.value, # Legacy alias for COMPLETED - "FAILED": PipelineStatus.FAILURE.value, # Legacy alias for ERROR - "FAILURE": PipelineStatus.FAILURE.value, # Legacy variant - "RUNNING": PipelineStatus.INPROGRESS.value, # Legacy alias for EXECUTING - "INPROGRESS": PipelineStatus.INPROGRESS.value, # Legacy variant - "YET_TO_START": PipelineStatus.YET_TO_START.value, # Legacy variant - } - - # Default to FAILURE for unknown statuses - return status_mapping.get(execution_status.upper(), "FAILURE") + key = str(execution_status).upper() + status_mapping = { + # Canonical + "COMPLETED": PipelineStatus.SUCCESS.value, + "ERROR": PipelineStatus.FAILURE.value, + "EXECUTING": PipelineStatus.INPROGRESS.value, + "PENDING": PipelineStatus.YET_TO_START.value, + "STOPPED": PipelineStatus.FAILURE.value, + "QUEUED": PipelineStatus.INPROGRESS.value, + "CANCELED": PipelineStatus.FAILURE.value, + # Legacy aliases + "SUCCESS": PipelineStatus.SUCCESS.value, + "FAILED": PipelineStatus.FAILURE.value, + "FAILURE": PipelineStatus.FAILURE.value, + "RUNNING": PipelineStatus.INPROGRESS.value, + "INPROGRESS": PipelineStatus.INPROGRESS.value, + "YET_TO_START": PipelineStatus.YET_TO_START.value, + } + return status_mapping.get(key, PipelineStatus.FAILURE.value)
497-501: Send ISO‑8601 for last_run_time (backend expects datetime, not epoch)Use an ISO UTC string to avoid serializer/DB mismatches; keep increment_run_count.
Apply:
- last_run_time=time.time(), + last_run_time=datetime.now(timezone.utc).isoformat(),Add import near the top:
@@ -import time -from typing import Any +import time +from datetime import datetime, timezone +from typing import AnyPlease confirm the backend serializer expects ISO strings for last_run_time and supports increment_run_count.
828-845: Stop shadowing get_cache_manager; reuse the top‑level importThe inner import pulls a different symbol path and risks inconsistent cache behavior. Reuse the already imported get_cache_manager from cache_utils and drop the ImportError branch.
Apply:
try: - # Use the existing cache manager to clear execution cache - from shared.cache import get_cache_manager - - cache_manager = get_cache_manager() + # Use the existing cache manager imported from cache_utils + cache_manager = get_cache_manager() if cache_manager and hasattr(cache_manager, "delete_execution_cache"): # Use the direct cache method similar to ExecutionCacheUtils.delete_execution cache_manager.delete_execution_cache( workflow_id=context.workflow_id, execution_id=context.execution_id ) logger.info(f"Cleared execution cache for {context.execution_id}") else: logger.debug("Cache manager not available or method not found") - except ImportError: - logger.debug("Cache manager not available for direct cleanup") except Exception as e: logger.warning(f"Failed to clear execution cache directly: {e}")
There was a problem hiding this comment.
Actionable comments posted: 0
🧹 Nitpick comments (15)
workers/api-deployment/tasks.py (15)
59-59: Use specific exception types instead of bareExceptionCatching
Exceptionis too broad and may hide programming errors. Use specific exception types when possible.- except Exception as log_error: + except (AttributeError, ValueError, TypeError) as log_error:
246-247: Improve exception logging and handlingUse
logging.exceptionfor better error context and avoid catching bareException.- except Exception as e: - logger.error(f"API execution failed: {e}") + except Exception as e: + logger.exception(f"API execution failed: {e}")
260-260: Use explicit string conversionUse explicit conversion flag for better code clarity.
- f"error={str(e)}", + f"error={e!s}",
415-416: Create custom exception class and improve error messagingCreating a custom exception class and using shorter error messages would improve code maintainability.
class WorkflowExecutionError(Exception): """Raised when workflow execution context cannot be retrieved.""" pass- raise Exception(f"Failed to get execution context: {execution_response.error}") + raise WorkflowExecutionError(f"Execution context unavailable: {execution_response.error}")
605-644: Unused loop variablebatch_indexThe loop control variable
batch_indexis not used within the loop body.- for batch_index, batch in enumerate(batches): + for _batch_index, batch in enumerate(batches):
666-668: Abstract exception raising to inner functionConsider abstracting the exception logic to improve code organization.
def _raise_orchestration_error(execution_id: str, message: str) -> None: """Helper to raise orchestration errors consistently.""" logger.error(message) raise Exception(message)- if not result: - exception = f"Failed to queue execution task {execution_id}" - logger.error(exception) - raise Exception(exception) + if not result: + _raise_orchestration_error(execution_id, f"Failed to queue execution task {execution_id}")
692-694: Use explicit string conversion flagsUse explicit conversion flags for better code clarity and performance.
- error_message=f"Error while processing files: {str(e)}", + error_message=f"Error while processing files: {e!s}",- logger.error(f"Execution {execution_id} failed: {str(e)}", exc_info=True) + logger.error(f"Execution {execution_id} failed: {e!s}", exc_info=True)
724-724: Improve error message structureConsider using a shorter, more focused error message.
- raise TypeError(f"Expected dict[str, FileHashData], got {type(input_files)}") + raise TypeError(f"Expected dict[str, FileHashData], got {type(input_files).__name__}")
796-796: Unused function parameterapi_clientThe
api_clientparameter is not used in_create_file_datafunction.Since the function doesn't use
api_client, either remove it or add functionality that uses it:def _create_file_data( workflow_id: str, execution_id: str, organization_id: str, pipeline_id: str | None, scheduled: bool, execution_mode: str | None, use_file_history: bool, - api_client: InternalAPIClient, total_files: int = 0, **kwargs: dict[str, Any], ) -> WorkerFileData:And update the caller at line 796:
file_data = _create_file_data( workflow_id=workflow_id, execution_id=execution_id, organization_id=schema_name, pipeline_id=pipeline_id, scheduled=scheduled, execution_mode=execution_mode_str, use_file_history=use_file_history, - api_client=api_client, total_files=total_files, **kwargs, )
954-955: Improve exception loggingUse
logging.exceptioninstead oflogging.errorfor better traceback information.- logger.error(f"Error calculating manual review decisions for API batch: {e}") + logger.exception(f"Error calculating manual review decisions for API batch: {e}")
964-964: Unused function parameterselfThe
selfparameter is not used in theapi_deployment_status_checkfunction body.If
selfis not needed, consider using_selfto indicate it's intentionally unused, or remove it if not required by the Celery task decorator:-def api_deployment_status_check( - self, execution_id: str, organization_id: str -) -> dict[str, Any]: +def api_deployment_status_check( + _self, execution_id: str, organization_id: str +) -> dict[str, Any]:
985-987: Create custom exception and improve error handlingUsing a custom exception class and shorter error messages would improve maintainability.
class ExecutionContextError(Exception): """Raised when execution context cannot be retrieved.""" pass- raise Exception( - f"Failed to get execution context: {execution_response.error}" - ) + raise ExecutionContextError(f"Context unavailable: {execution_response.error}")
1026-1026: Uselogging.exceptionfor better error contextUse
logging.exceptionto include the full traceback for debugging.- logger.error(f"Failed to check API deployment status: {e}") + logger.exception(f"Failed to check API deployment status: {e}")
1034-1034: Unused function parameterorganization_idThe
organization_idparameter is not used in theapi_deployment_cleanupfunction.If the parameter is required by the task signature but not used, consider prefixing with underscore:
-def api_deployment_cleanup( - self, execution_id: str, organization_id: str -) -> dict[str, Any]: +def api_deployment_cleanup( + self, execution_id: str, _organization_id: str +) -> dict[str, Any]:
1066-1066: Uselogging.exceptionfor better error contextUse
logging.exceptionto include the full traceback for debugging.- logger.error(f"Cleanup failed for API deployment execution {execution_id}: {e}") + logger.exception(f"Cleanup failed for API deployment execution {execution_id}: {e}")
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
Cache: Disabled due to Reviews > Disable Cache setting
Knowledge base: Disabled due to Reviews -> Disable Knowledge Base setting
📒 Files selected for processing (1)
workers/api-deployment/tasks.py(1 hunks)
🧰 Additional context used
🪛 Ruff (0.13.1)
workers/api-deployment/tasks.py
59-59: Do not catch blind exception: Exception
(BLE001)
241-241: Do not catch blind exception: Exception
(BLE001)
244-244: Consider moving this statement to an else block
(TRY300)
246-246: Do not catch blind exception: Exception
(BLE001)
247-247: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
260-260: Use explicit conversion flag
Replace with conversion flag
(RUF010)
271-271: Do not catch blind exception: Exception
(BLE001)
416-416: Create your own exception
(TRY002)
416-416: Avoid specifying long messages outside the exception class
(TRY003)
465-465: Unpacked variable files_to_process is never used
Prefix it with an underscore or any other dummy variable pattern
(RUF059)
550-550: Do not catch blind exception: Exception
(BLE001)
551-553: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
605-605: Loop control variable batch_index not used within loop body
Rename unused batch_index to _batch_index
(B007)
668-668: Abstract raise to an inner function
(TRY301)
668-668: Create your own exception
(TRY002)
692-692: Use explicit conversion flag
Replace with conversion flag
(RUF010)
694-694: Use explicit conversion flag
Replace with conversion flag
(RUF010)
724-724: Avoid specifying long messages outside the exception class
(TRY003)
796-796: Unused function argument: api_client
(ARG001)
952-952: Consider moving this statement to an else block
(TRY300)
954-954: Do not catch blind exception: Exception
(BLE001)
955-955: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
964-964: Unused function argument: self
(ARG001)
985-987: Abstract raise to an inner function
(TRY301)
985-987: Create your own exception
(TRY002)
985-987: Avoid specifying long messages outside the exception class
(TRY003)
1026-1026: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
1034-1034: Unused function argument: organization_id
(ARG001)
1063-1063: Consider moving this statement to an else block
(TRY300)
1066-1066: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
1185-1185: Consider moving this statement to an else block
(TRY300)
1187-1187: Do not catch blind exception: Exception
(BLE001)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: build
🔇 Additional comments (4)
workers/api-deployment/tasks.py (4)
134-282: Well-structured unified execution functionThe
_unified_api_executionfunction effectively consolidates duplicate logic from multiple task entry points with proper error handling, standardized logging, and StateStore cleanup on both success and error paths.
284-387: Clean task implementation with proper delegationBoth Celery task functions (
async_execute_bin_apiandasync_execute_bin) properly delegate to the unified execution logic while maintaining backward compatibility. The task configuration matches Django backend patterns appropriately.
512-525: Security risk: Use ofast.literal_evalas fallback parserThe code uses
ast.literal_evalas a fallback for parsing cached results, which poses potential security risks with untrusted input and may fail on complex Python literals.Apply this diff to use a safer approach:
- # Fallback: try to parse Python string representation (legacy format) - try: - import ast - - cached_result_data = ast.literal_eval( - cached_result_data - ) - logger.info( - f"Successfully parsed legacy Python string format for {hash_data.file_name}" - ) - except (ValueError, SyntaxError) as parse_error: - logger.warning( - f"Failed to parse legacy format for {hash_data.file_name}: {parse_error}" - ) - # Keep as string if all parsing fails + # For legacy format, log a warning and keep as string + logger.warning( + f"Cached result for {hash_data.file_name} is in legacy format and cannot be parsed safely. " + f"Consider re-processing this file to update the cache format." + ) + # Keep as string rather than risk security issues with ast.literal_eval
1117-1117: Inconsistent return type in error pathThe function
_check_file_history_apiis supposed to return a tuple but returns onlyhash_values_of_filesin the error path at line 1117.Apply this diff to fix the return statement:
if not file_hashes: logger.info( f"No file hashes available for history check in execution {execution_id}, processing all files" ) - return hash_values_of_files + return hash_values_of_files, {}
There was a problem hiding this comment.
Actionable comments posted: 4
🧹 Nitpick comments (16)
docker/docker-compose.yaml (2)
280-307: Add HEALTH_PORT for consistency with other v2 workers.Other v2 services expose a HEALTH_PORT matching their metrics port. Add it here to keep config uniform and health checks simple.
- WORKER_NAME=callback-worker-v2 - CALLBACK_METRICS_PORT=8083 + - HEALTH_PORT=8083
246-279: Limit metrics ports to localhost in dev.All v2 workers publish metrics/health ports on 0.0.0.0. For local/dev, bind to 127.0.0.1 to avoid exposing them on the host network.
Example (repeat per service):
- ports: - - "8085:8090" + ports: + - "127.0.0.1:8085:8090"If remote access is needed, consider a Traefik-secured route instead of host publishing.
Also applies to: 280-307, 308-347, 348-376, 377-426, 427-477, 503-548
backend/notification_v2/internal_api_views.py (4)
121-125: Log traceback and consider narrowing the catch.Use logger.exception to preserve stack traces; ideally catch narrower errors (e.g., DatabaseError/ValueError) instead of bare Exception. (Ruff BLE001/TRY400)
- except Exception as e: - logger.error(f"Error getting pipeline notifications for {pipeline_id}: {e}") + except Exception: + logger.exception(f"Error getting pipeline notifications for {pipeline_id}") return JsonResponse( {"status": "error", "message": INTERNAL_SERVER_ERROR_MSG}, status=500 )
50-64: Deduplicate notification serialization via a helper/serializer.The same mapping is repeated. Extract a helper to reduce drift and centralize redaction. Also lets you enforce a single schema for both pipeline and API.
Apply within these blocks:
- notifications_data = [] - for notification in notifications: - notifications_data.append( - { - "id": str(notification.id), - "notification_type": notification.notification_type, - "platform": notification.platform, - "url": notification.url, - "authorization_type": notification.authorization_type, - "authorization_key": notification.authorization_key, - "authorization_header": notification.authorization_header, - "max_retries": notification.max_retries, - "is_active": notification.is_active, - } - ) + notifications_data = [serialize_notification(n) for n in notifications]Add this helper near the constants:
def serialize_notification(notification): return { "id": str(notification.id), "notification_type": notification.notification_type, "platform": notification.platform, "url": notification.url, "authorization_type": notification.authorization_type, # Prefer redaction or credential_ref here if possible. "authorization_key": notification.authorization_key, "authorization_header": notification.authorization_header, "max_retries": notification.max_retries, "is_active": notification.is_active, }Also applies to: 88-102
37-47: Minor: prefer get() over exists()+first() when querying by primary key.exists()+first() performs two queries; get() does one and surfaces MultipleObjectsReturned if constraints regress.
- if pipeline_queryset.exists(): - pipeline = pipeline_queryset.first() + try: + pipeline = pipeline_queryset.get() + except Pipeline.DoesNotExist: + pipeline = None
30-33: Optional: enforce UUID path converters for internal notification endpoints
In backend/notification_v2/internal_urls.py, replace<str:pipeline_id>and<str:api_id>with<uuid:pipeline_id>/<uuid:api_id>(e.g. lines 26, 31, 37, 42) to fail fast on malformed IDs and avoid unnecessary DB lookups.unstract/core/src/unstract/core/data_models.py (5)
13-13: Use typing.Callable and import it (prevents lint/type-check issues).Replace builtin
callablewithtyping.Callable[[Any], Any]and import it.-from typing import Any +from typing import Any, ClassVar, Callable @@ - transform_functions: dict[str, callable] | None = None, + transform_functions: dict[str, Callable[[Any], Any]] | None = None,Also applies to: 49-51
231-233: Annotate class-level mutable constants with ClassVar (RUF012).Prevents mutable class attribute lint and clarifies intent.
class FileOperationConstants: @@ - DEFAULT_FILE_PATTERNS = ["*"] - ALL_FILES_PATTERN = "*" + DEFAULT_FILE_PATTERNS: ClassVar[list[str]] = ["*"] + ALL_FILES_PATTERN: ClassVar[str] = "*"
707-710: Compute file hash in chunks to avoid loading entire file in memory.Prevents high memory usage for large files and aligns with READ_CHUNK_SIZE.
- with open(file_path, "rb") as f: - hash_value = hashlib.sha256(f.read()).hexdigest() + from .data_models import FileOperationConstants as _FOC # local import to avoid cycles + sha = hashlib.sha256() + with open(file_path, "rb") as f: + for chunk in iter(lambda: f.read(_FOC.READ_CHUNK_SIZE), b""): + sha.update(chunk) + hash_value = sha.hexdigest()
917-923: Use timezone-aware timestamps consistently.Elsewhere you use UTC-aware datetimes; set modified_at with UTC too.
- self.modified_at = datetime.now() + self.modified_at = datetime.now(UTC)
1489-1491: Broaden type annotation for FileBatchData.files or validate shape earlier.Current hint says list[dict[str, Any]], but code accepts list, tuple, or dict entries. Consider
list[dict[str, Any] | list[Any] | tuple[Any, Any]]or normalize shapes earlier.workers/callback/tasks.py (5)
116-135: Normalize mapping fallback to PipelineStatus constant.Return the enum value, not raw string literal, for consistency.
- # Default to FAILURE for unknown statuses - return status_mapping.get(execution_status.upper(), "FAILURE") + # Default to FAILURE for unknown statuses + return status_mapping.get(execution_status.upper(), PipelineStatus.FAILURE.value)
972-972: Fix return type annotation: use typing.Any (not builtinany).-def _get_execution_directories(context: CallbackContext) -> list[tuple[str, any, str]]: +def _get_execution_directories(context: CallbackContext) -> list[tuple[str, Any, str]]:
691-695: Reuse top-level WorkerConfig import; remove inner import.Avoid redundant imports and potential discrepancies in environments.
- # Create temporary API client for initial execution fetch (no org context needed) - from shared.infrastructure.config import WorkerConfig - - temp_config = WorkerConfig() + # Create temporary API client for initial execution fetch (no org context needed) + temp_config = WorkerConfig()
1340-1342: Log full tracebacks in failure paths.Use logger.exception inside except blocks to preserve stack traces.
- logger.error( - f"Unified batch callback processing failed for execution {context.execution_id}: {e}" - ) + logger.exception( + "Unified batch callback processing failed for execution %s", context.execution_id + ) @@ - logger.error( - f"API callback processing failed for execution {execution_id}: {e}" - ) + logger.exception( + "API callback processing failed for execution %s", execution_id + )Also applies to: 1574-1576
1139-1143: Optional: avoid emojis/special glyphs in logs for better log ingestion.Some collectors/alerts choke on non-ASCII. Consider replacing with plain text.
Also applies to: 1175-1181, 1073-1076, 1085-1086, 935-939
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
Cache: Disabled due to Reviews > Disable Cache setting
Knowledge base: Disabled due to Reviews -> Disable Knowledge Base setting
📒 Files selected for processing (4)
backend/notification_v2/internal_api_views.py(1 hunks)docker/docker-compose.yaml(1 hunks)unstract/core/src/unstract/core/data_models.py(1 hunks)workers/callback/tasks.py(1 hunks)
🧰 Additional context used
🪛 Ruff (0.13.1)
workers/callback/tasks.py
93-93: Do not catch blind exception: Exception
(BLE001)
179-179: Consider moving this statement to an else block
(TRY300)
181-181: Do not catch blind exception: Exception
(BLE001)
188-188: Unused function argument: execution_id
(ARG001)
221-221: Consider moving this statement to an else block
(TRY300)
312-312: Do not catch blind exception: Exception
(BLE001)
409-409: Consider moving this statement to an else block
(TRY300)
534-534: Do not catch blind exception: Exception
(BLE001)
547-547: Use explicit conversion flag
Replace with conversion flag
(RUF010)
562-562: Unused function argument: workflow_id
(ARG001)
610-614: Consider moving this statement to an else block
(TRY300)
616-616: Do not catch blind exception: Exception
(BLE001)
626-626: Unused function argument: results
(ARG001)
649-649: Avoid specifying long messages outside the exception class
(TRY003)
669-671: Abstract raise to an inner function
(TRY301)
669-671: Avoid specifying long messages outside the exception class
(TRY003)
703-705: Avoid specifying long messages outside the exception class
(TRY003)
721-723: Avoid specifying long messages outside the exception class
(TRY003)
793-793: Do not catch blind exception: Exception
(BLE001)
794-794: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
795-795: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling
(B904)
795-795: Avoid specifying long messages outside the exception class
(TRY003)
799-799: Avoid specifying long messages outside the exception class
(TRY003)
828-828: Consider moving this statement to an else block
(TRY300)
829-829: Do not catch blind exception: Exception
(BLE001)
854-854: Do not catch blind exception: Exception
(BLE001)
947-947: Do not catch blind exception: Exception
(BLE001)
948-950: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
967-967: Do not catch blind exception: Exception
(BLE001)
968-968: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
1006-1006: Do not catch blind exception: Exception
(BLE001)
1023-1023: Do not catch blind exception: Exception
(BLE001)
1041-1041: Do not catch blind exception: Exception
(BLE001)
1045-1048: Avoid specifying long messages outside the exception class
(TRY003)
1051-1051: Avoid specifying long messages outside the exception class
(TRY003)
1104-1104: Do not catch blind exception: Exception
(BLE001)
1105-1107: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
1144-1144: Do not catch blind exception: Exception
(BLE001)
1145-1145: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
1147-1147: Use explicit conversion flag
Replace with conversion flag
(RUF010)
1175-1175: String contains ambiguous ℹ (INFORMATION SOURCE). Did you mean i (LATIN SMALL LETTER I)?
(RUF001)
1182-1182: Do not catch blind exception: Exception
(BLE001)
1224-1224: Unused function argument: args
(ARG001)
1251-1251: Avoid specifying long messages outside the exception class
(TRY003)
1330-1330: Do not catch blind exception: Exception
(BLE001)
1337-1337: Consider moving this statement to an else block
(TRY300)
1340-1342: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
1357-1357: Do not catch blind exception: Exception
(BLE001)
1358-1358: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
1403-1403: Unused function argument: args
(ARG001)
1426-1426: Avoid specifying long messages outside the exception class
(TRY003)
1435-1435: Avoid specifying long messages outside the exception class
(TRY003)
1444-1444: Create your own exception
(TRY002)
1444-1444: Avoid specifying long messages outside the exception class
(TRY003)
1476-1476: Abstract raise to an inner function
(TRY301)
1571-1571: Consider moving this statement to an else block
(TRY300)
1574-1576: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
1595-1595: Do not catch blind exception: Exception
(BLE001)
1596-1596: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
1604-1604: Unused function argument: execution_status
(ARG001)
1669-1669: Do not catch blind exception: Exception
(BLE001)
1670-1672: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
1671-1671: Use explicit conversion flag
Replace with conversion flag
(RUF010)
1678-1678: Unused function argument: execution_status
(ARG001)
1743-1743: Do not catch blind exception: Exception
(BLE001)
1744-1746: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
1745-1745: Use explicit conversion flag
Replace with conversion flag
(RUF010)
unstract/core/src/unstract/core/data_models.py
113-113: Do not catch blind exception: Exception
(BLE001)
172-172: Avoid specifying long messages outside the exception class
(TRY003)
231-231: Mutable class attributes should be annotated with typing.ClassVar
(RUF012)
387-387: Consider moving this statement to an else block
(TRY300)
389-389: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling
(B904)
389-389: Avoid specifying long messages outside the exception class
(TRY003)
596-598: Avoid specifying long messages outside the exception class
(TRY003)
690-690: Avoid specifying long messages outside the exception class
(TRY003)
692-692: Avoid specifying long messages outside the exception class
(TRY003)
751-754: Avoid specifying long messages outside the exception class
(TRY003)
783-783: Avoid specifying long messages outside the exception class
(TRY003)
785-785: Avoid specifying long messages outside the exception class
(TRY003)
787-790: Avoid specifying long messages outside the exception class
(TRY003)
883-883: Avoid specifying long messages outside the exception class
(TRY003)
885-885: Avoid specifying long messages outside the exception class
(TRY003)
945-945: Avoid specifying long messages outside the exception class
(TRY003)
947-947: Avoid specifying long messages outside the exception class
(TRY003)
1311-1313: Avoid specifying long messages outside the exception class
(TRY003)
1411-1413: Avoid specifying long messages outside the exception class
(TRY003)
1421-1424: Avoid specifying long messages outside the exception class
(TRY003)
1442-1444: Avoid specifying long messages outside the exception class
(TRY003)
1446-1448: Avoid specifying long messages outside the exception class
(TRY003)
1450-1452: Avoid specifying long messages outside the exception class
(TRY003)
1454-1456: Avoid specifying long messages outside the exception class
(TRY003)
1472-1474: Avoid specifying long messages outside the exception class
(TRY003)
1473-1473: Use explicit conversion flag
Replace with conversion flag
(RUF010)
1504-1506: Avoid specifying long messages outside the exception class
(TRY003)
1512-1515: Avoid specifying long messages outside the exception class
(TRY003)
1518-1520: Avoid specifying long messages outside the exception class
(TRY003)
1527-1529: Avoid specifying long messages outside the exception class
(TRY003)
1532-1534: Avoid specifying long messages outside the exception class
(TRY003)
1536-1538: Avoid specifying long messages outside the exception class
(TRY003)
1542-1544: Avoid specifying long messages outside the exception class
(TRY003)
1547-1549: Avoid specifying long messages outside the exception class
(TRY003)
1551-1553: Avoid specifying long messages outside the exception class
(TRY003)
1561-1563: Avoid specifying long messages outside the exception class
(TRY003)
1565-1567: Avoid specifying long messages outside the exception class
(TRY003)
1572-1574: Avoid specifying long messages outside the exception class
(TRY003)
1573-1573: Use explicit conversion flag
Replace with conversion flag
(RUF010)
backend/notification_v2/internal_api_views.py
121-121: Do not catch blind exception: Exception
(BLE001)
122-122: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
176-176: Do not catch blind exception: Exception
(BLE001)
177-177: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
212-212: Do not catch blind exception: Exception
(BLE001)
213-213: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
248-248: Do not catch blind exception: Exception
(BLE001)
249-249: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
🔇 Additional comments (11)
docker/docker-compose.yaml (2)
348-376: Align GENERAL_METRICS_PORT with exposed container port (8082).GENERAL_METRICS_PORT=8081 while HEALTH_PORT and published port target 8082; metrics endpoint likely on 8082. Align to avoid broken scraping and health/metrics divergence.
Apply:
- - GENERAL_METRICS_PORT=8081 + - GENERAL_METRICS_PORT=8082
261-269: Verify Celery pool selection and version support.
Ensure the image runs Celery 5.5.x and that using the threads pool (default here) suits your I/O-bound workloads—threads disable soft_timeouts and max_tasks_per_child and are limited by the GIL. Benchmark under threads, confirm prefetch and concurrency behave as expected in staging, and prefer prefork for CPU-bound tasks. Applies to lines 261–269, 324–340, 392–419, 442–470, 518–541.backend/notification_v2/internal_api_views.py (5)
172-180: Don’t swallow Http404; return JSON 404 and log traceback.get_object_or_404 raises Http404, not APIDeployment.DoesNotExist. Map it to 404 JSON and use logger.exception for stack traces. (Ruff BLE001/TRY400).
- except APIDeployment.DoesNotExist: - return JsonResponse( - {"status": "error", "message": "API deployment not found"}, status=404 - ) - except Exception as e: - logger.error(f"Error getting API notifications for {api_id}: {e}") + except Http404: + return JsonResponse( + {"status": "error", "message": "API deployment not found"}, status=404 + ) + except Exception: + logger.exception(f"Error getting API notifications for {api_id}") return JsonResponse( {"status": "error", "message": INTERNAL_SERVER_ERROR_MSG}, status=500 )
208-216: Same Http404 handling + traceback for get_pipeline_data.- except Pipeline.DoesNotExist: - return JsonResponse( - {"status": "error", "message": "Pipeline not found"}, status=404 - ) - except Exception as e: - logger.error(f"Error getting pipeline data for {pipeline_id}: {e}") + except Http404: + return JsonResponse( + {"status": "error", "message": "Pipeline not found"}, status=404 + ) + except Exception: + logger.exception(f"Error getting pipeline data for {pipeline_id}") return JsonResponse( {"status": "error", "message": INTERNAL_SERVER_ERROR_MSG}, status=500 )
244-252: Same Http404 handling + traceback for get_api_data.- except APIDeployment.DoesNotExist: - return JsonResponse( - {"status": "error", "message": "API deployment not found"}, status=404 - ) - except Exception as e: - logger.error(f"Error getting API data for {api_id}: {e}") + except Http404: + return JsonResponse( + {"status": "error", "message": "API deployment not found"}, status=404 + ) + except Exception: + logger.exception(f"Error getting API data for {api_id}") return JsonResponse( {"status": "error", "message": INTERNAL_SERVER_ERROR_MSG}, status=500 )
15-17: Import Http404 to return proper JSON 404s with get_object_or_404.Needed for the except blocks below; currently Http404 would be swallowed or turn into default HTML.
-from django.http import JsonResponse +from django.http import JsonResponse, Http404
39-43: organization_field argument is correct; no action needed. TheDefaultOrganizationMixindefines the FK asorganization, so usingfilter_queryset_by_organization(..., "organization")on bothPipelineandAPIDeploymentis valid.unstract/core/src/unstract/core/data_models.py (2)
888-896: Guard .isoformat() when created_at/modified_at may already be strings.- if self.created_at: - data["created_at"] = self.created_at.isoformat() - if self.modified_at: - data["modified_at"] = self.modified_at.isoformat() + if self.created_at: + data["created_at"] = ( + self.created_at.isoformat() + if isinstance(self.created_at, datetime) + else self.created_at + ) + if self.modified_at: + data["modified_at"] = ( + self.modified_at.isoformat() + if isinstance(self.modified_at, datetime) + else self.modified_at + )
1012-1023: Repeat the datetime guard in WorkflowExecutionData.to_dict().- if self.created_at: - data["created_at"] = self.created_at.isoformat() - if self.modified_at: - data["modified_at"] = self.modified_at.isoformat() + if self.created_at: + data["created_at"] = ( + self.created_at.isoformat() + if isinstance(self.created_at, datetime) + else self.created_at + ) + if self.modified_at: + data["modified_at"] = ( + self.modified_at.isoformat() + if isinstance(self.modified_at, datetime) + else self.modified_at + )workers/callback/tasks.py (2)
841-849: Avoid mixed cache imports and symbol shadowing; reuse top-level get_cache_manager.- # Use the existing cache manager to clear execution cache - from shared.cache import get_cache_manager - - cache_manager = get_cache_manager() + # Use the cache manager already imported at module top + cache_manager = get_cache_manager()
499-502: Send ISO‑8601 for last_run_time; backend DateTimeField expects an ISO string.Epoch seconds cause serializer mismatches downstream.
- last_run_time=time.time(), + from datetime import datetime, timezone + last_run_time=datetime.now(timezone.utc).isoformat(),
| "id": str(notification.id), | ||
| "notification_type": notification.notification_type, | ||
| "platform": notification.platform, | ||
| "url": notification.url, | ||
| "authorization_type": notification.authorization_type, | ||
| "authorization_key": notification.authorization_key, | ||
| "authorization_header": notification.authorization_header, | ||
| "max_retries": notification.max_retries, | ||
| "is_active": notification.is_active, | ||
| } |
There was a problem hiding this comment.
Avoid returning raw auth credentials in responses.
authorization_key/header likely contain secrets. Even for internal S2S, prefer returning a credential_ref and have workers fetch/resolve via a short‑lived, scoped token or internal secrets service; at minimum, encrypt at rest, transmit over TLS, and never log these fields. This is a security/compliance risk.
Also applies to: 92-101
🤖 Prompt for AI Agents
In backend/notification_v2/internal_api_views.py around lines 54-63 (and
similarly 92-101), the view is returning raw authorization_key and
authorization_header values which are sensitive; remove these fields from the
API response and replace them with a non-sensitive credential_ref (e.g.,
credential_id or masked value) that callers can use to request the real secret
from the internal secrets service; update callers/workers to resolve the
credential_ref via the internal secrets service using a short-lived scoped
token, ensure secrets are never logged, and keep secrets encrypted at rest and
transmitted only over TLS.
| worker-file-processing-v2: | ||
| image: unstract/worker-unified:${VERSION} | ||
| container_name: unstract-worker-file-processing-v2 | ||
| restart: unless-stopped | ||
| # command: ["file-processing"] | ||
| command: [".venv/bin/celery", "-A", "worker", "worker", "--queues=file_processing,api_file_processing,file_processing_priority", "--loglevel=INFO", "--pool=prefork", "--concurrency=4", "--prefetch-multiplier=1", "--without-gossip", "--without-mingle", "--without-heartbeat"] | ||
| ports: | ||
| - "8087:8082" | ||
| env_file: | ||
| - ../workers/.env | ||
| - ./essentials.env | ||
| depends_on: | ||
| - db | ||
| - redis | ||
| - rabbitmq | ||
| environment: | ||
| - ENVIRONMENT=development | ||
| - APPLICATION_NAME=unstract-worker-file-processing-v2 | ||
| - WORKER_TYPE=file_processing | ||
| - WORKER_MODE=oss | ||
| - WORKER_NAME=file-processing-worker-v2 | ||
| - FILE_PROCESSING_METRICS_PORT=8082 | ||
| # OSS Configuration - Enterprise features disabled | ||
| - MANUAL_REVIEW_ENABLED=false | ||
| - ENTERPRISE_FEATURES_ENABLED=false | ||
| - PLUGIN_REGISTRY_MODE=oss | ||
| # Configurable Celery options | ||
| - CELERY_QUEUES_FILE_PROCESSING=${CELERY_QUEUES_FILE_PROCESSING:-file_processing,api_file_processing} | ||
| - CELERY_POOL=${WORKER_FILE_PROCESSING_POOL:-threads} | ||
| - CELERY_PREFETCH_MULTIPLIER=${WORKER_FILE_PROCESSING_PREFETCH_MULTIPLIER:-1} | ||
| - CELERY_CONCURRENCY=${WORKER_FILE_PROCESSING_CONCURRENCY:-4} | ||
| - CELERY_EXTRA_ARGS=${WORKER_FILE_PROCESSING_EXTRA_ARGS:-} | ||
| labels: | ||
| - traefik.enable=false | ||
| volumes: | ||
| - ./workflow_data:/data | ||
| - ${TOOL_REGISTRY_CONFIG_SRC_PATH}:/data/tool_registry_config | ||
| profiles: | ||
| - workers-v2 | ||
|
|
There was a problem hiding this comment.
Unify worker command to honor env-driven tuning (prefers entrypoint mode).
This service hard-codes Celery CLI args; the CELERY_* env vars you set below (pool, prefetch, queues, concurrency, extra args) won’t take effect. It also diverges from other v2 workers using image-level command shims.
Use the unified command and let the entrypoint read the env:
- # command: ["file-processing"]
- command: [".venv/bin/celery", "-A", "worker", "worker", "--queues=file_processing,api_file_processing,file_processing_priority", "--loglevel=INFO", "--pool=prefork", "--concurrency=4", "--prefetch-multiplier=1", "--without-gossip", "--without-mingle", "--without-heartbeat"]
+ command: ["file-processing"]Optionally, if you must keep the explicit Celery CLI, remove/rename the unused CELERY_* envs here to prevent confusion and document the chosen flags explicitly.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| worker-file-processing-v2: | |
| image: unstract/worker-unified:${VERSION} | |
| container_name: unstract-worker-file-processing-v2 | |
| restart: unless-stopped | |
| # command: ["file-processing"] | |
| command: [".venv/bin/celery", "-A", "worker", "worker", "--queues=file_processing,api_file_processing,file_processing_priority", "--loglevel=INFO", "--pool=prefork", "--concurrency=4", "--prefetch-multiplier=1", "--without-gossip", "--without-mingle", "--without-heartbeat"] | |
| ports: | |
| - "8087:8082" | |
| env_file: | |
| - ../workers/.env | |
| - ./essentials.env | |
| depends_on: | |
| - db | |
| - redis | |
| - rabbitmq | |
| environment: | |
| - ENVIRONMENT=development | |
| - APPLICATION_NAME=unstract-worker-file-processing-v2 | |
| - WORKER_TYPE=file_processing | |
| - WORKER_MODE=oss | |
| - WORKER_NAME=file-processing-worker-v2 | |
| - FILE_PROCESSING_METRICS_PORT=8082 | |
| # OSS Configuration - Enterprise features disabled | |
| - MANUAL_REVIEW_ENABLED=false | |
| - ENTERPRISE_FEATURES_ENABLED=false | |
| - PLUGIN_REGISTRY_MODE=oss | |
| # Configurable Celery options | |
| - CELERY_QUEUES_FILE_PROCESSING=${CELERY_QUEUES_FILE_PROCESSING:-file_processing,api_file_processing} | |
| - CELERY_POOL=${WORKER_FILE_PROCESSING_POOL:-threads} | |
| - CELERY_PREFETCH_MULTIPLIER=${WORKER_FILE_PROCESSING_PREFETCH_MULTIPLIER:-1} | |
| - CELERY_CONCURRENCY=${WORKER_FILE_PROCESSING_CONCURRENCY:-4} | |
| - CELERY_EXTRA_ARGS=${WORKER_FILE_PROCESSING_EXTRA_ARGS:-} | |
| labels: | |
| - traefik.enable=false | |
| volumes: | |
| - ./workflow_data:/data | |
| - ${TOOL_REGISTRY_CONFIG_SRC_PATH}:/data/tool_registry_config | |
| profiles: | |
| - workers-v2 | |
| worker-file-processing-v2: | |
| image: unstract/worker-unified:${VERSION} | |
| container_name: unstract-worker-file-processing-v2 | |
| restart: unless-stopped | |
| command: ["file-processing"] | |
| ports: | |
| - "8087:8082" | |
| env_file: | |
| - ../workers/.env | |
| - ./essentials.env | |
| depends_on: | |
| - db | |
| - redis | |
| - rabbitmq | |
| environment: | |
| - ENVIRONMENT=development | |
| - APPLICATION_NAME=unstract-worker-file-processing-v2 | |
| - WORKER_TYPE=file_processing | |
| - WORKER_MODE=oss | |
| - WORKER_NAME=file-processing-worker-v2 | |
| - FILE_PROCESSING_METRICS_PORT=8082 | |
| # OSS Configuration - Enterprise features disabled | |
| - MANUAL_REVIEW_ENABLED=false | |
| - ENTERPRISE_FEATURES_ENABLED=false | |
| - PLUGIN_REGISTRY_MODE=oss | |
| # Configurable Celery options | |
| - CELERY_QUEUES_FILE_PROCESSING=${CELERY_QUEUES_FILE_PROCESSING:-file_processing,api_file_processing} | |
| - CELERY_POOL=${WORKER_FILE_PROCESSING_POOL:-threads} | |
| - CELERY_PREFETCH_MULTIPLIER=${WORKER_FILE_PROCESSING_PREFETCH_MULTIPLIER:-1} | |
| - CELERY_CONCURRENCY=${WORKER_FILE_PROCESSING_CONCURRENCY:-4} | |
| - CELERY_EXTRA_ARGS=${WORKER_FILE_PROCESSING_EXTRA_ARGS:-} | |
| labels: | |
| - traefik.enable=false | |
| volumes: | |
| - ./workflow_data:/data | |
| - ${TOOL_REGISTRY_CONFIG_SRC_PATH}:/data/tool_registry_config | |
| profiles: | |
| - workers-v2 |
🤖 Prompt for AI Agents
In docker/docker-compose.yaml around lines 308-347 the service
worker-file-processing-v2 hard-codes the Celery CLI args via a command which
prevents the CELERY_* env vars below from taking effect and diverges from other
v2 workers; remove the explicit command so the image entrypoint can read and
apply CELERY_QUEUES_FILE_PROCESSING, CELERY_POOL, CELERY_PREFETCH_MULTIPLIER,
CELERY_CONCURRENCY and CELERY_EXTRA_ARGS at runtime (i.e., restore the unified
entrypoint usage used by other v2 workers), or if you insist on an inline Celery
CLI, update the command to interpolate those env vars from the compose
environment and remove/rename the now-unused CELERY_* env variables to avoid
confusion and document the chosen flags.
| def get_max_files(cls, config: dict, default: int = 100) -> int: | ||
| """Get max files setting from config using both naming conventions.""" | ||
| return int(config.get(cls.MAX_FILES) or config.get(cls.MAX_FILES_CAMEL, default)) | ||
|
|
There was a problem hiding this comment.
Avoid “or” when reading numeric config; 0 gets overwritten by default.
If a user sets 0 intentionally, or will coerce to default. Use None checks instead.
- return int(config.get(cls.MAX_FILES) or config.get(cls.MAX_FILES_CAMEL, default))
+ value = config.get(cls.MAX_FILES, config.get(cls.MAX_FILES_CAMEL, None))
+ return int(value) if value is not None else default📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| def get_max_files(cls, config: dict, default: int = 100) -> int: | |
| """Get max files setting from config using both naming conventions.""" | |
| return int(config.get(cls.MAX_FILES) or config.get(cls.MAX_FILES_CAMEL, default)) | |
| def get_max_files(cls, config: dict, default: int = 100) -> int: | |
| """Get max files setting from config using both naming conventions.""" | |
| value = config.get(cls.MAX_FILES, config.get(cls.MAX_FILES_CAMEL, None)) | |
| return int(value) if value is not None else default |
🤖 Prompt for AI Agents
In unstract/core/src/unstract/core/data_models.py around lines 278 to 281, the
current use of "or" when reading numeric config will treat 0 as false and fall
back to the default; change the logic to explicitly check for None (and missing
keys) instead of using "or". Retrieve the value for cls.MAX_FILES first with
config.get(...), if that returns None then try cls.MAX_FILES_CAMEL, if both are
None return the default, otherwise convert the found value to int and return it;
ensure you do not treat other falsy numeric values (like 0) as missing.
| def __post_init__(self): | ||
| source_connection_type = self.source_config.connection_type | ||
| destination_connection_type = self.destination_config.connection_type | ||
| if ( | ||
| source_connection_type == ConnectionType.FILESYSTEM.value | ||
| and destination_connection_type == ConnectionType.FILESYSTEM.value | ||
| ): | ||
| self.workflow_type = WorkflowType.TASK | ||
| elif ( | ||
| source_connection_type == ConnectionType.FILESYSTEM.value | ||
| and destination_connection_type == ConnectionType.DATABASE.value | ||
| ): | ||
| self.workflow_type = WorkflowType.ETL | ||
| elif ( | ||
| source_connection_type == ConnectionType.API.value | ||
| and destination_connection_type == ConnectionType.API.value | ||
| ): | ||
| self.workflow_type = WorkflowType.API | ||
| elif ( | ||
| source_connection_type == ConnectionType.FILESYSTEM.value | ||
| and destination_connection_type == ConnectionType.MANUALREVIEW.value | ||
| ): | ||
| self.workflow_type = WorkflowType.DEFAULT | ||
|
|
There was a problem hiding this comment.
Critical: workflow_type normalization and broken legacy conversion.
- workflow_type can be str (from from_dict), but to_dict uses
.value→ AttributeError. - legacy converter references nonexistent
workflow_type_detection.
Add enum normalization in __post_init__ and fix legacy mapping.
class WorkflowDefinitionResponseData:
@@
- def __post_init__(self):
+ def __post_init__(self):
+ # Normalize workflow_type if provided as string
+ if isinstance(self.workflow_type, str):
+ try:
+ self.workflow_type = WorkflowType(self.workflow_type)
+ except ValueError:
+ self.workflow_type = WorkflowType.DEFAULT
source_connection_type = self.source_config.connection_type
@@
def workflow_definition_to_legacy_format(
@@
- legacy_format["workflow_type"] = workflow_def.workflow_type_detection.workflow_type
+ legacy_format["workflow_type"] = (
+ workflow_def.workflow_type.value
+ if isinstance(workflow_def.workflow_type, WorkflowType)
+ else str(workflow_def.workflow_type)
+ )Also applies to: 1854-1881
…-MISC-remove-django-dependency-from-celery-workers
…cker, and removing redis keys
There was a problem hiding this comment.
Actionable comments posted: 0
🧹 Nitpick comments (2)
unstract/core/src/unstract/core/tool_execution_status.py (2)
184-188: Consider catching specific exceptions or documenting the rationale for broad exception handling.Catching all exceptions makes the operation resilient but can mask unexpected failures like Redis connection issues or programming errors. If broad catching is intentional for cache cleanup resilience, consider:
- Catching specific exceptions (e.g.,
redis.RedisError,redis.ConnectionError) for known failure modes- Logging at ERROR level for truly unexpected exceptions
- Adding a comment documenting why all exceptions are suppressed
Example with specific exception handling:
- except Exception as e: - logger.warning( - f"Failed to delete status for tool execution {tool_execution_data.execution_id}: {e}. " - ) - return + except redis.RedisError as e: + logger.warning( + f"Failed to delete status for tool execution {tool_execution_data.execution_id}: {e}" + ) + return + except Exception as e: + logger.error( + f"Unexpected error deleting status for tool execution {tool_execution_data.execution_id}: {e}", + exc_info=True + ) + return
206-210: Consider catching specific exceptions or documenting the rationale for broad exception handling.Same concern as in
delete_statusabove: catching all exceptions can mask Redis connection failures or other unexpected errors. Consider the same refinements—catch specific exception types for known failure modes and log unexpected exceptions at ERROR level.Example with specific exception handling:
- except Exception as e: - logger.warning( - f"Failed to update TTL for tool execution {tool_execution_data.execution_id}: {e}. " - ) - return + except redis.RedisError as e: + logger.warning( + f"Failed to update TTL for tool execution {tool_execution_data.execution_id}: {e}" + ) + return + except Exception as e: + logger.error( + f"Unexpected error updating TTL for tool execution {tool_execution_data.execution_id}: {e}", + exc_info=True + ) + return
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
Cache: Disabled due to Reviews > Disable Cache setting
Knowledge base: Disabled due to Reviews -> Disable Knowledge Base setting
📒 Files selected for processing (1)
unstract/core/src/unstract/core/tool_execution_status.py(4 hunks)
🧰 Additional context used
🪛 Ruff (0.13.1)
unstract/core/src/unstract/core/tool_execution_status.py
184-184: Do not catch blind exception: Exception
(BLE001)
206-206: Do not catch blind exception: Exception
(BLE001)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: build
🔇 Additional comments (1)
unstract/core/src/unstract/core/tool_execution_status.py (1)
1-1: LGTM: Standard logging setup.The addition of module-level logging follows Python best practices and enables observability for the error handling added below.
Also applies to: 13-13
ritwik-g
left a comment
There was a problem hiding this comment.
Approving based on integration testing observations. Further testing will be done in staging.
…rom-celery-workers Signed-off-by: Ritwik G <100672805+ritwik-g@users.noreply.github.com>
for more information, see https://pre-commit.ci
|
|
…rnal APIs (#1494) * UN-2470 [MISC] Remove Django dependency from Celery workers This commit introduces a new worker architecture that decouples Celery workers from Django where possible, enabling support for gevent/eventlet pool types and reducing worker startup overhead. Key changes: - Created separate worker modules (api-deployment, callback, file_processing, general) - Added internal API endpoints for worker communication - Implemented Django-free task execution where appropriate - Added shared utilities and client facades - Updated container configurations for new worker architecture 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com> * Fix pre-commit issues: file permissions and ruff errors Setup the docker for new workers - Add executable permissions to worker entrypoint files - Fix import order in namespace package __init__.py - Remove unused variable api_status in general worker - Address ruff E402 and F841 errors 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com> * refactoreed, Dockerfiles,fixes * flexibility on celery run commands * added debug logs * handled filehistory for API * cleanup * cleanup * cloud plugin structure * minor changes in import plugin * added notification and logger workers under new worker module * add docker compatibility for new workers * handled docker issues * log consumer worker fixes * added scheduler worker * minor env changes * cleanup the logs * minor changes in logs * resolved scheduler worker issues * cleanup and refactor * ensuring backward compatibbility to existing wokers * added configuration internal apis and cache utils * optimization * Fix API client singleton pattern to share HTTP sessions - Fix flawed singleton implementation that was trying to share BaseAPIClient instances - Now properly shares HTTP sessions between specialized clients - Eliminates 6x BaseAPIClient initialization by reusing the same underlying session - Should reduce API deployment orchestration time by ~135ms (from 6 clients to 1 session) - Added debug logging to verify singleton pattern activation * cleanup and structuring * cleanup in callback * file system connectors issue * celery env values changes * optional gossip * variables for sync, mingle and gossip * Fix for file type check * Task pipeline issue resolving * api deployement failed response handled * Task pipline fixes * updated file history cleanup with active file execution * pipline status update and workflow ui page execution * cleanup and resolvinf conflicts * remove unstract-core from conenctoprs * Commit uv.lock changes * uv locks updates * resolve migration issues * defer connector-metadtda * Fix connector migration for production scale - Add encryption key handling with defer() to prevent decryption failures - Add final cleanup step to fix duplicate connector names - Optimize for large datasets with batch processing and bulk operations - Ensure unique constraint in migration 0004 can be created successfully 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com> * hitl fixes * minor fixes on hitl * api_hub related changes * dockerfile fixes * api client cache fixes with actual response class * fix: tags and llm_profile_id * optimized clear cache * cleanup * enhanced logs * added more handling on is file dir and added loggers * cleanup the runplatform script * internal apis are excempting from csrf * sonal cloud issues * sona-cloud issues * resolving sonar cloud issues * resolving sonar cloud issues * Delta: added Batch size fix in workers * comments addressed * celery configurational changes for new workers * fiixes in callback regaurding the pipline type check * change internal url registry logic * gitignore changes * gitignore changes * addressng pr cmmnets and cleanup the codes * adding missed profiles for v2 * sonal cloud blocker issues resolved * imlement otel * Commit uv.lock changes * handle execution time and some cleanup * adding user_data in metadata Pr: #1544 * scheduler backward compatibitlity * replace user_data with custom_data * Commit uv.lock changes * celery worker command issue resolved * enhance package imports in connectors by changing to lazy imports * Update runner.py by removing the otel from it Update runner.py by removing the otel from it Signed-off-by: ali <117142933+muhammad-ali-e@users.noreply.github.com> * added delta changes * handle erro to destination db * resolve tool instances id validation and hitl queu name in API * handled direct execution from workflow page to worker and logs * handle cost logs * Update health.py Signed-off-by: Ritwik G <100672805+ritwik-g@users.noreply.github.com> * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * minor log changes * introducing log consumer scheduler to bulk create, and socket .emit from worker for ws * Commit uv.lock changes * time limit or timeout celery config cleanup * implemented redis client class in worker * pipline status enum mismatch * notification worker fixes * resolve uv lock conflicts * workflow log fixes * ws channel name issue resolved. and handling redis down in status tracker, and removing redis keys * default TTL changed for unified logs * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --------- Signed-off-by: ali <117142933+muhammad-ali-e@users.noreply.github.com> Signed-off-by: Ritwik G <100672805+ritwik-g@users.noreply.github.com> Co-authored-by: Claude <noreply@anthropic.com> Co-authored-by: Ritwik G <100672805+ritwik-g@users.noreply.github.com> Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>



What
Why
How
Can this PR break any existing features. If yes, please list possible items. If no, please explain why. (PS: Admins do not merge the PR without this section filled)
Database Migrations
backend/connector_v2/migrations/0003_migrate_to_centralized_connectors.pyENCRYPTION_KEYenv) and we have old connectors with oldENCRYPTION_KEY. @chandrasekharan-zipstack please acheck this.Env Config
Relevant Docs
Related Issues or PRs
Dependencies Versions
Notes on Testing
/backendand/unstractdirectories/workersdirectory - workers implementation is work in progressScreenshots
N/A - Backend infrastructure changes only
Checklist
I have read and understood the Contribution Guidelines.
Please focus your review on:
/backend/*- New internal API implementations/unstract/*- Shared connector package changesbackend/connector_v2/migrations/0003_migrate_to_centralized_connectors.pyDo NOT review:
/workers/*- Workers are work in progress and not ready for reviewThe workers directory contains in-progress implementation that will be completed in subsequent PRs.