From 7ebac2e9facc0e36a4aa7cca8f1e1d078ac85bd3 Mon Sep 17 00:00:00 2001 From: tvaron3 Date: Thu, 9 Apr 2026 19:29:28 -0700 Subject: [PATCH 01/15] fix(workloads): use concurrent operations in r_w_q_workload, set 1M throughput - Uncomment concurrent upsert/read/query calls - Remove manual timing counters and log_request_counts - Set THROUGHPUT to 1000000 in workload_configs.py - Keep CIRCUIT_BREAKER_ENABLED = False (PPCB disabled) Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../tests/workloads/r_w_q_workload.py | 58 +++---------------- .../tests/workloads/workload_configs.py | 2 +- 2 files changed, 10 insertions(+), 50 deletions(-) diff --git a/sdk/cosmos/azure-cosmos/tests/workloads/r_w_q_workload.py b/sdk/cosmos/azure-cosmos/tests/workloads/r_w_q_workload.py index 0d7730c6f86e..89e35937fbfc 100644 --- a/sdk/cosmos/azure-cosmos/tests/workloads/r_w_q_workload.py +++ b/sdk/cosmos/azure-cosmos/tests/workloads/r_w_q_workload.py @@ -2,10 +2,6 @@ # Copyright (c) Microsoft Corporation. All rights reserved. import sys -from azure.cosmos import documents -from datetime import datetime, timezone -import time -from workload_utils import _get_upsert_item from workload_utils import * from workload_configs import * sys.path.append(r"/") @@ -13,32 +9,13 @@ from azure.cosmos.aio import CosmosClient as AsyncClient import asyncio -async def log_request_counts(counter): - while True: - await asyncio.sleep(300) # 5 minutes - count = counter["count"] - duration = counter["upsert_time"] + counter["read_time"] - print("Current UTC time:", datetime.now(timezone.utc)) - print(f"Executed {count} requests in the last 5 minutes") - print(f"Errors in the last 5 minutes: {counter['error_count']}") - print(f"Per-request latency: {duration / count if count > 0 else 0} ms") - print(f"Upsert latency: {counter['upsert_time'] / (count / 2) if count > 0 else 0} ms") - print(f"Read latency: {counter['read_time'] / (count / 2) if count > 0 else 0} ms") - print("-------------------------------") - counter["count"] = 0 # reset for next interval - counter["upsert_time"] = 0 - counter["read_time"] = 0 - counter["error_count"] = 0 async def run_workload(client_id, client_logger): - counter = {"count": 0, "upsert_time": 0, "read_time": 0, "error_count": 0} - # Start background task - asyncio.create_task(log_request_counts(counter)) - connectionPolicy = documents.ConnectionPolicy() - connectionPolicy.UseMultipleWriteLocations = USE_MULTIPLE_WRITABLE_LOCATIONS - async with AsyncClient(COSMOS_URI, COSMOS_CREDENTIAL, connection_policy=connectionPolicy, - preferred_locations=PREFERRED_LOCATIONS, excluded_locations=CLIENT_EXCLUDED_LOCATIONS, - enable_diagnostics_logging=True, logger=client_logger, + async with AsyncClient(COSMOS_URI, COSMOS_CREDENTIAL, + preferred_locations=PREFERRED_LOCATIONS, + excluded_locations=CLIENT_EXCLUDED_LOCATIONS, + enable_diagnostics_logging=True, + logger=client_logger, user_agent=get_user_agent(client_id)) as client: db = client.get_database_client(COSMOS_DATABASE) cont = db.get_container_client(COSMOS_CONTAINER) @@ -46,32 +23,15 @@ async def run_workload(client_id, client_logger): while True: try: - upsert_start = time.perf_counter() - up_item = _get_upsert_item() - await cont.upsert_item(up_item) - elapsed = time.perf_counter() - upsert_start - counter["count"] += 1 - counter["upsert_time"] += elapsed - - read_start = time.perf_counter() - item = get_existing_random_item() - await cont.read_item(item["id"], item[PARTITION_KEY]) - elapsed = time.perf_counter() - read_start - counter["count"] += 1 - counter["read_time"] += elapsed - - # await upsert_item_concurrently(cont, REQUEST_EXCLUDED_LOCATIONS, CONCURRENT_REQUESTS) - # await read_item_concurrently(cont, REQUEST_EXCLUDED_LOCATIONS, CONCURRENT_REQUESTS) - # await query_items_concurrently(cont, REQUEST_EXCLUDED_LOCATIONS, CONCURRENT_QUERIES) + await upsert_item_concurrently(cont, REQUEST_EXCLUDED_LOCATIONS, CONCURRENT_REQUESTS) + await read_item_concurrently(cont, REQUEST_EXCLUDED_LOCATIONS, CONCURRENT_REQUESTS) + await query_items_concurrently(cont, REQUEST_EXCLUDED_LOCATIONS, CONCURRENT_QUERIES) except Exception as e: - counter["error_count"] += 1 client_logger.info("Exception in application layer") + client_logger.error(e) if __name__ == "__main__": file_name = os.path.basename(__file__) prefix, logger = create_logger(file_name) - create_inner_logger() - utc_now = datetime.now(timezone.utc) - print("Current UTC time:", utc_now) asyncio.run(run_workload(prefix, logger)) diff --git a/sdk/cosmos/azure-cosmos/tests/workloads/workload_configs.py b/sdk/cosmos/azure-cosmos/tests/workloads/workload_configs.py index fc4c640418d0..340577e1aa03 100644 --- a/sdk/cosmos/azure-cosmos/tests/workloads/workload_configs.py +++ b/sdk/cosmos/azure-cosmos/tests/workloads/workload_configs.py @@ -22,4 +22,4 @@ CONCURRENT_QUERIES = 2 PARTITION_KEY = "id" # id or pk NUMBER_OF_LOGICAL_PARTITIONS = 10000 -THROUGHPUT = 100000 +THROUGHPUT = 1000000 From 22a1117b4e983e1f8f27210282f99c3dcaafd510 Mon Sep 17 00:00:00 2001 From: tvaron3 Date: Thu, 9 Apr 2026 20:07:33 -0700 Subject: [PATCH 02/15] feat(workloads): add performance metrics collection for DR drill testing MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add a performance metrics library that reports PerfResult documents to a Cosmos DB results account, matching the Rust perf tool schema exactly so both SDKs feed the same ADX → Grafana pipeline. New files: - perf_stats.py: Thread-safe latency histogram with sorted-list percentile calculation and atomic drain_all() for consistent summary+error snapshots - perf_config.py: All config from environment variables (RESULTS_COSMOS_URI, PERF_REPORT_INTERVAL=300s, perfdb/perfresults defaults) - perf_reporter.py: Background daemon thread that drains Stats every 5 min and upserts PerfResult documents via sync CosmosClient with AAD auth Modified files: - workload_configs.py: All configs now driven by environment variables - workload_utils.py: Added timed operation wrappers with error tracking (CosmosHttpResponseError status_code/sub_status extraction), only successful operations record latency to avoid polluting percentiles - All *_workload.py files: Integrated Stats + PerfReporter with try/finally lifecycle management Key design decisions: - Sorted-list percentiles (no hdrhistogram native dependency) - psutil for CPU/memory with /proc fallback on Linux - Cached psutil.Process() instance for accurate CPU readings - CosmosClient stored and closed properly to avoid resource leaks - sdk_language='python', sdk_version from azure.cosmos.__version__ - PPCB disabled by default - All reporter errors caught and logged as warnings (never crash workload) Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../tests/workloads/perf_config.py | 34 +++ .../tests/workloads/perf_reporter.py | 253 ++++++++++++++++++ .../tests/workloads/perf_stats.py | 120 +++++++++ .../tests/workloads/r_proxy_workload.py | 57 ++-- .../tests/workloads/r_w_q_proxy_workload.py | 58 ++-- .../tests/workloads/r_w_q_sync_workload.py | 48 ++-- .../r_w_q_with_incorrect_client_workload.py | 48 ++-- .../tests/workloads/r_w_q_workload.py | 50 ++-- .../tests/workloads/r_workload.py | 46 ++-- .../tests/workloads/w_proxy_workload.py | 54 ++-- .../tests/workloads/w_workload.py | 44 +-- .../tests/workloads/workload_configs.py | 40 +-- .../tests/workloads/workload_utils.py | 186 +++++++++---- 13 files changed, 817 insertions(+), 221 deletions(-) create mode 100644 sdk/cosmos/azure-cosmos/tests/workloads/perf_config.py create mode 100644 sdk/cosmos/azure-cosmos/tests/workloads/perf_reporter.py create mode 100644 sdk/cosmos/azure-cosmos/tests/workloads/perf_stats.py diff --git a/sdk/cosmos/azure-cosmos/tests/workloads/perf_config.py b/sdk/cosmos/azure-cosmos/tests/workloads/perf_config.py new file mode 100644 index 000000000000..1c60a7b779d8 --- /dev/null +++ b/sdk/cosmos/azure-cosmos/tests/workloads/perf_config.py @@ -0,0 +1,34 @@ +# The MIT License (MIT) +# Copyright (c) Microsoft Corporation. All rights reserved. +"""Performance reporting configuration from environment variables.""" + +import os +import subprocess +import uuid + + +def _get_git_sha() -> str: + """Get the current git commit SHA, or 'unknown' if unavailable.""" + try: + result = subprocess.run( + ["git", "rev-parse", "--short", "HEAD"], + capture_output=True, text=True, timeout=5 + ) + if result.returncode == 0: + return result.stdout.strip() + except Exception: + pass + return "unknown" + + +def get_perf_config() -> dict: + """Build performance reporter configuration from environment variables.""" + return { + "enabled": os.environ.get("PERF_ENABLED", "true").lower() == "true", + "results_endpoint": os.environ.get("RESULTS_COSMOS_URI", ""), + "results_database": os.environ.get("RESULTS_COSMOS_DATABASE", "perfdb"), + "results_container": os.environ.get("RESULTS_COSMOS_CONTAINER", "perfresults"), + "report_interval": int(os.environ.get("PERF_REPORT_INTERVAL", "300")), + "workload_id": os.environ.get("PERF_WORKLOAD_ID", str(uuid.uuid4())), + "commit_sha": os.environ.get("PERF_COMMIT_SHA", _get_git_sha()), + } diff --git a/sdk/cosmos/azure-cosmos/tests/workloads/perf_reporter.py b/sdk/cosmos/azure-cosmos/tests/workloads/perf_reporter.py new file mode 100644 index 000000000000..c3d77c8f0646 --- /dev/null +++ b/sdk/cosmos/azure-cosmos/tests/workloads/perf_reporter.py @@ -0,0 +1,253 @@ +# The MIT License (MIT) +# Copyright (c) Microsoft Corporation. All rights reserved. +"""Background reporter that drains Stats and upserts PerfResult documents to Cosmos DB.""" + +import logging +import os +import socket +import threading +import uuid +from datetime import datetime, timezone + +from perf_stats import Stats + +try: + import psutil +except ImportError: + psutil = None + +logger = logging.getLogger(__name__) + + +def _get_sdk_version() -> str: + """Get the azure-cosmos SDK version string.""" + try: + from azure.cosmos import __version__ + return __version__ + except Exception: + return "unknown" + + +def _get_cpu_percent(process=None) -> float: + """Get current process CPU percent.""" + if psutil and process: + try: + return process.cpu_percent(interval=None) + except Exception: + pass + return 0.0 + + +def _get_memory_bytes(process=None) -> int: + """Get current process RSS in bytes.""" + if psutil and process: + try: + return process.memory_info().rss + except Exception: + pass + # Fallback: parse /proc on Linux + try: + with open("/proc/self/status", "r") as f: + for line in f: + if line.startswith("VmRSS:"): + return int(line.split()[1]) * 1024 # kB to bytes + except Exception: + pass + return 0 + + +def _get_system_cpu_percent() -> float: + """Get system-wide CPU percent.""" + if psutil: + try: + return psutil.cpu_percent(interval=None) + except Exception: + pass + return 0.0 + + +def _get_system_memory() -> tuple[int, int]: + """Get system total and used memory in bytes.""" + if psutil: + try: + mem = psutil.virtual_memory() + return mem.total, mem.used + except Exception: + pass + # Fallback: parse /proc/meminfo + try: + info = {} + with open("/proc/meminfo", "r") as f: + for line in f: + parts = line.split() + if len(parts) >= 2: + info[parts[0].rstrip(":")] = int(parts[1]) * 1024 + total = info.get("MemTotal", 0) + available = info.get("MemAvailable", 0) + return total, total - available + except Exception: + pass + return 0, 0 + + +class PerfReporter: + """Background reporter that upserts PerfResult documents to Cosmos DB. + + Uses a daemon thread with a sync CosmosClient. The reporter drains + Stats at the configured interval and upserts one PerfResult document + per operation. All errors are caught and logged — the workload is + never affected. + """ + + def __init__(self, stats: Stats, config: dict): + self._stats = stats + self._config = config + self._stop_event = threading.Event() + self._thread: threading.Thread | None = None + self._client = None + self._container = None + self._hostname = socket.gethostname() + self._sdk_version = _get_sdk_version() + self._process = psutil.Process() if psutil else None + + def start(self): + """Start the background reporting thread (daemon).""" + self._thread = threading.Thread(target=self._run, daemon=True, name="perf-reporter") + self._thread.start() + logger.info("PerfReporter started (interval=%ds, workload_id=%s)", + self._config["report_interval"], self._config["workload_id"]) + + def stop(self): + """Stop the reporter and flush final results.""" + self._stop_event.set() + if self._thread and self._thread.is_alive(): + self._thread.join(timeout=30) + # Final flush + try: + self._ensure_container() + self._flush() + except Exception as e: + logger.warning("PerfReporter final flush failed: %s", e) + # Close the CosmosClient to release connection pools + if self._client: + try: + self._client.close() + except Exception: + pass + logger.info("PerfReporter stopped") + + def _run(self): + """Reporter loop: wait for interval, then flush.""" + try: + self._ensure_container() + except Exception as e: + logger.warning("PerfReporter failed to initialize Cosmos client: %s", e) + return + + # Prime psutil CPU counters (first call always returns 0) + _get_cpu_percent(self._process) + _get_system_cpu_percent() + + while not self._stop_event.wait(timeout=self._config["report_interval"]): + try: + self._flush() + except Exception as e: + logger.warning("PerfReporter flush failed: %s", e) + + def _ensure_container(self): + """Lazily create the sync CosmosClient and get the container reference.""" + if self._container is not None: + return + + from azure.cosmos import CosmosClient, PartitionKey + from azure.identity import DefaultAzureCredential + + endpoint = self._config["results_endpoint"] + if not endpoint: + raise ValueError("RESULTS_COSMOS_URI is not set") + + credential = DefaultAzureCredential() + self._client = CosmosClient(endpoint, credential) + db = self._client.get_database_client(self._config["results_database"]) + self._container = db.get_container_client(self._config["results_container"]) + + def _flush(self): + """Drain stats and upsert PerfResult + ErrorResult documents.""" + if self._container is None: + return + + now = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S.%fZ") + cpu = _get_cpu_percent(self._process) + mem = _get_memory_bytes(self._process) + sys_cpu = _get_system_cpu_percent() + sys_total, sys_used = _get_system_memory() + + # Import workload configs for config snapshot + concurrency = _safe_int_env("COSMOS_CONCURRENT_REQUESTS", 100) + preferred = os.environ.get("COSMOS_PREFERRED_LOCATIONS", "") + excluded = os.environ.get("COSMOS_CLIENT_EXCLUDED_LOCATIONS", "") + ppcb = os.environ.get("AZURE_COSMOS_ENABLE_CIRCUIT_BREAKER", "false").lower() == "true" + + # Atomically drain both summaries and errors + summaries, errors = self._stats.drain_all() + for s in summaries: + doc = { + "id": str(uuid.uuid4()), + "partition_key": str(uuid.uuid4()), + "workload_id": self._config["workload_id"], + "commit_sha": self._config["commit_sha"], + "hostname": self._hostname, + "TIMESTAMP": now, + "operation": s["operation"], + "count": s["count"], + "errors": s["errors"], + "min_ms": round(s["min_ms"], 3), + "max_ms": round(s["max_ms"], 3), + "mean_ms": round(s["mean_ms"], 3), + "p50_ms": round(s["p50_ms"], 3), + "p90_ms": round(s["p90_ms"], 3), + "p99_ms": round(s["p99_ms"], 3), + "cpu_percent": round(cpu, 1), + "memory_bytes": mem, + "system_cpu_percent": round(sys_cpu, 1), + "system_total_memory_bytes": sys_total, + "system_used_memory_bytes": sys_used, + "sdk_language": "python", + "sdk_version": self._sdk_version, + "config_concurrency": concurrency, + "config_application_region": preferred, + "config_excluded_regions": excluded, + "config_ppcb_enabled": ppcb, + } + try: + self._container.upsert_item(doc) + except Exception as e: + logger.warning("PerfReporter upsert failed for %s: %s", s["operation"], e) + + # Upsert error documents + for err in errors: + doc = { + "id": str(uuid.uuid4()), + "partition_key": str(uuid.uuid4()), + "workload_id": self._config["workload_id"], + "commit_sha": self._config["commit_sha"], + "hostname": self._hostname, + "TIMESTAMP": now, + "operation": err["operation"], + "error_message": err["error_message"][:2000], + "source_message": err["source_message"][:4000], + "sdk_language": "python", + "error_status_code": err.get("error_status_code"), + "error_sub_status_code": err.get("error_sub_status_code"), + } + try: + self._container.upsert_item(doc) + except Exception as e: + logger.warning("PerfReporter error upsert failed: %s", e) + + +def _safe_int_env(name: str, default: int) -> int: + try: + return int(os.environ.get(name, str(default))) + except (ValueError, TypeError): + return default diff --git a/sdk/cosmos/azure-cosmos/tests/workloads/perf_stats.py b/sdk/cosmos/azure-cosmos/tests/workloads/perf_stats.py new file mode 100644 index 000000000000..f5b95321eeca --- /dev/null +++ b/sdk/cosmos/azure-cosmos/tests/workloads/perf_stats.py @@ -0,0 +1,120 @@ +# The MIT License (MIT) +# Copyright (c) Microsoft Corporation. All rights reserved. +"""Thread-safe per-operation latency histogram and error tracking.""" + +import threading +import time + + +class Stats: + """Thread-safe per-operation latency and error tracking. + + Uses a sorted-list percentile calculation to avoid native dependencies. + A background reporter drains accumulated data every reporting interval. + """ + + def __init__(self): + self._lock = threading.Lock() + self._latencies: dict[str, list[float]] = {} + self._error_counts: dict[str, int] = {} + self._errors: list[dict] = [] + + def record(self, operation: str, duration_ms: float): + """Record a successful operation with its duration in milliseconds.""" + with self._lock: + if operation not in self._latencies: + self._latencies[operation] = [] + self._error_counts[operation] = 0 + self._latencies[operation].append(duration_ms) + + def record_error(self, operation: str, error_msg: str, traceback_str: str, + status_code: int = None, sub_status_code: int = None): + """Record a failed operation with error details.""" + with self._lock: + if operation not in self._error_counts: + self._error_counts[operation] = 0 + self._latencies[operation] = [] + self._error_counts[operation] += 1 + self._errors.append({ + "operation": operation, + "error_message": error_msg, + "source_message": traceback_str, + "error_status_code": status_code, + "error_sub_status_code": sub_status_code, + "timestamp": time.time(), + }) + + def drain_all(self) -> tuple[list[dict], list[dict]]: + """Atomically drain both summaries and error details under one lock. + + Returns (summaries, errors) where summaries is a list of dicts with: + operation, count, errors, min_ms, max_ms, mean_ms, p50_ms, p90_ms, p99_ms + and errors is a list of dicts with: operation, error_message, source_message, + error_status_code, error_sub_status_code, timestamp. + """ + with self._lock: + summaries = [] + all_ops = set(self._latencies.keys()) | set(self._error_counts.keys()) + for op in sorted(all_ops): + latencies = self._latencies.get(op, []) + errors = self._error_counts.get(op, 0) + count = len(latencies) + if count == 0 and errors == 0: + continue + if count > 0: + latencies.sort() + total = sum(latencies) + summaries.append({ + "operation": op, + "count": count, + "errors": errors, + "min_ms": latencies[0], + "max_ms": latencies[-1], + "mean_ms": total / count, + "p50_ms": _percentile(latencies, 50.0), + "p90_ms": _percentile(latencies, 90.0), + "p99_ms": _percentile(latencies, 99.0), + }) + else: + summaries.append({ + "operation": op, + "count": 0, + "errors": errors, + "min_ms": 0.0, + "max_ms": 0.0, + "mean_ms": 0.0, + "p50_ms": 0.0, + "p90_ms": 0.0, + "p99_ms": 0.0, + }) + self._latencies.clear() + self._error_counts.clear() + error_details = self._errors + self._errors = [] + return summaries, error_details + + def drain_summaries(self) -> list[dict]: + """Drain accumulated stats and return per-operation summaries.""" + summaries, _ = self.drain_all() + return summaries + + def drain_errors(self) -> list[dict]: + """Drain accumulated error details.""" + _, errors = self.drain_all() + return errors + + +def _percentile(sorted_data: list[float], pct: float) -> float: + """Calculate percentile from pre-sorted data using nearest-rank method.""" + n = len(sorted_data) + if n == 0: + return 0.0 + if n == 1: + return sorted_data[0] + rank = (pct / 100.0) * (n - 1) + lower = int(rank) + upper = lower + 1 + if upper >= n: + return sorted_data[-1] + fraction = rank - lower + return sorted_data[lower] + fraction * (sorted_data[upper] - sorted_data[lower]) diff --git a/sdk/cosmos/azure-cosmos/tests/workloads/r_proxy_workload.py b/sdk/cosmos/azure-cosmos/tests/workloads/r_proxy_workload.py index c53ec13090ac..11d0b49191d9 100644 --- a/sdk/cosmos/azure-cosmos/tests/workloads/r_proxy_workload.py +++ b/sdk/cosmos/azure-cosmos/tests/workloads/r_proxy_workload.py @@ -2,33 +2,48 @@ # Copyright (c) Microsoft Corporation. All rights reserved. from workload_utils import * from workload_configs import * +from perf_config import get_perf_config +from perf_stats import Stats +from perf_reporter import PerfReporter + from azure.cosmos.aio import CosmosClient as AsyncClient from azure.core.pipeline.transport import AioHttpTransport import asyncio async def run_workload(client_id, client_logger): - session = create_custom_session() - async with AsyncClient(COSMOS_URI, - COSMOS_CREDENTIAL, - multiple_write_locations=USE_MULTIPLE_WRITABLE_LOCATIONS, - preferred_locations=PREFERRED_LOCATIONS, - transport=AioHttpTransport(session=session, session_owner=False), - excluded_locations=CLIENT_EXCLUDED_LOCATIONS, - enable_diagnostics_logging=True, - logger=client_logger, - user_agent=get_user_agent(client_id) - ) as client: - db = client.get_database_client(COSMOS_DATABASE) - cont = db.get_container_client(COSMOS_CONTAINER) - await asyncio.sleep(1) + stats = Stats() + perf_config = get_perf_config() + reporter = None + if perf_config["enabled"] and perf_config["results_endpoint"]: + reporter = PerfReporter(stats, perf_config) + reporter.start() + + try: + session = create_custom_session() + async with AsyncClient(COSMOS_URI, + COSMOS_CREDENTIAL, + multiple_write_locations=USE_MULTIPLE_WRITABLE_LOCATIONS, + preferred_locations=PREFERRED_LOCATIONS, + transport=AioHttpTransport(session=session, session_owner=False), + excluded_locations=CLIENT_EXCLUDED_LOCATIONS, + enable_diagnostics_logging=True, + logger=client_logger, + user_agent=get_user_agent(client_id) + ) as client: + db = client.get_database_client(COSMOS_DATABASE) + cont = db.get_container_client(COSMOS_CONTAINER) + await asyncio.sleep(1) - while True: - try: - await read_item_concurrently(cont, REQUEST_EXCLUDED_LOCATIONS, CONCURRENT_REQUESTS) - await query_items_concurrently(cont, REQUEST_EXCLUDED_LOCATIONS, CONCURRENT_QUERIES) - except Exception as e: - client_logger.info("Exception in application layer") - client_logger.error(e) + while True: + try: + await read_item_concurrently(cont, REQUEST_EXCLUDED_LOCATIONS, CONCURRENT_REQUESTS, stats) + await query_items_concurrently(cont, REQUEST_EXCLUDED_LOCATIONS, CONCURRENT_QUERIES, stats) + except Exception as e: + client_logger.info("Exception in application layer") + client_logger.error(e) + finally: + if reporter: + reporter.stop() if __name__ == "__main__": diff --git a/sdk/cosmos/azure-cosmos/tests/workloads/r_w_q_proxy_workload.py b/sdk/cosmos/azure-cosmos/tests/workloads/r_w_q_proxy_workload.py index 925713b78c81..7609928ba822 100644 --- a/sdk/cosmos/azure-cosmos/tests/workloads/r_w_q_proxy_workload.py +++ b/sdk/cosmos/azure-cosmos/tests/workloads/r_w_q_proxy_workload.py @@ -4,34 +4,48 @@ from workload_utils import * from workload_configs import * +from perf_config import get_perf_config +from perf_stats import Stats +from perf_reporter import PerfReporter from azure.cosmos.aio import CosmosClient as AsyncClient import asyncio async def run_workload(client_id, client_logger): - session = create_custom_session() - async with AsyncClient(COSMOS_URI, - COSMOS_CREDENTIAL, - multiple_write_locations=USE_MULTIPLE_WRITABLE_LOCATIONS, - preferred_locations=PREFERRED_LOCATIONS, - transport=AioHttpTransport(session=session, session_owner=False), - excluded_locations=CLIENT_EXCLUDED_LOCATIONS, - enable_diagnostics_logging=True, - logger=client_logger, - user_agent=get_user_agent(client_id) - ) as client: - db = client.get_database_client(COSMOS_DATABASE) - cont = db.get_container_client(COSMOS_CONTAINER) - await asyncio.sleep(1) + stats = Stats() + perf_config = get_perf_config() + reporter = None + if perf_config["enabled"] and perf_config["results_endpoint"]: + reporter = PerfReporter(stats, perf_config) + reporter.start() - while True: - try: - await upsert_item_concurrently(cont, REQUEST_EXCLUDED_LOCATIONS, CONCURRENT_REQUESTS) - await read_item_concurrently(cont, REQUEST_EXCLUDED_LOCATIONS, CONCURRENT_REQUESTS) - await query_items_concurrently(cont, REQUEST_EXCLUDED_LOCATIONS, CONCURRENT_QUERIES) - except Exception as e: - client_logger.info("Exception in application layer") - client_logger.error(e) + try: + session = create_custom_session() + async with AsyncClient(COSMOS_URI, + COSMOS_CREDENTIAL, + multiple_write_locations=USE_MULTIPLE_WRITABLE_LOCATIONS, + preferred_locations=PREFERRED_LOCATIONS, + transport=AioHttpTransport(session=session, session_owner=False), + excluded_locations=CLIENT_EXCLUDED_LOCATIONS, + enable_diagnostics_logging=True, + logger=client_logger, + user_agent=get_user_agent(client_id) + ) as client: + db = client.get_database_client(COSMOS_DATABASE) + cont = db.get_container_client(COSMOS_CONTAINER) + await asyncio.sleep(1) + + while True: + try: + await upsert_item_concurrently(cont, REQUEST_EXCLUDED_LOCATIONS, CONCURRENT_REQUESTS, stats) + await read_item_concurrently(cont, REQUEST_EXCLUDED_LOCATIONS, CONCURRENT_REQUESTS, stats) + await query_items_concurrently(cont, REQUEST_EXCLUDED_LOCATIONS, CONCURRENT_QUERIES, stats) + except Exception as e: + client_logger.info("Exception in application layer") + client_logger.error(e) + finally: + if reporter: + reporter.stop() if __name__ == "__main__": diff --git a/sdk/cosmos/azure-cosmos/tests/workloads/r_w_q_sync_workload.py b/sdk/cosmos/azure-cosmos/tests/workloads/r_w_q_sync_workload.py index 5f50d7e6d140..30d051e92e17 100644 --- a/sdk/cosmos/azure-cosmos/tests/workloads/r_w_q_sync_workload.py +++ b/sdk/cosmos/azure-cosmos/tests/workloads/r_w_q_sync_workload.py @@ -4,28 +4,42 @@ from workload_utils import * from workload_configs import * +from perf_config import get_perf_config +from perf_stats import Stats +from perf_reporter import PerfReporter from azure.cosmos import CosmosClient, documents def run_workload(client_id, client_logger): - connectionPolicy = documents.ConnectionPolicy() - connectionPolicy.UseMultipleWriteLocations = USE_MULTIPLE_WRITABLE_LOCATIONS - with CosmosClient(COSMOS_URI, COSMOS_CREDENTIAL, connection_policy=connectionPolicy, - preferred_locations=PREFERRED_LOCATIONS, excluded_locations=CLIENT_EXCLUDED_LOCATIONS, - enable_diagnostics_logging=True, logger=client_logger, - user_agent=get_user_agent(client_id)) as client: - db = client.get_database_client(COSMOS_DATABASE) - cont = db.get_container_client(COSMOS_CONTAINER) - time.sleep(1) + stats = Stats() + perf_config = get_perf_config() + reporter = None + if perf_config["enabled"] and perf_config["results_endpoint"]: + reporter = PerfReporter(stats, perf_config) + reporter.start() - while True: - try: - upsert_item(cont, REQUEST_EXCLUDED_LOCATIONS, CONCURRENT_REQUESTS) - read_item(cont, REQUEST_EXCLUDED_LOCATIONS, CONCURRENT_REQUESTS) - query_items(cont, REQUEST_EXCLUDED_LOCATIONS, CONCURRENT_QUERIES) - except Exception as e: - client_logger.info("Exception in application layer") - client_logger.error(e) + try: + connectionPolicy = documents.ConnectionPolicy() + connectionPolicy.UseMultipleWriteLocations = USE_MULTIPLE_WRITABLE_LOCATIONS + with CosmosClient(COSMOS_URI, COSMOS_CREDENTIAL, connection_policy=connectionPolicy, + preferred_locations=PREFERRED_LOCATIONS, excluded_locations=CLIENT_EXCLUDED_LOCATIONS, + enable_diagnostics_logging=True, logger=client_logger, + user_agent=get_user_agent(client_id)) as client: + db = client.get_database_client(COSMOS_DATABASE) + cont = db.get_container_client(COSMOS_CONTAINER) + time.sleep(1) + + while True: + try: + upsert_item(cont, REQUEST_EXCLUDED_LOCATIONS, CONCURRENT_REQUESTS, stats) + read_item(cont, REQUEST_EXCLUDED_LOCATIONS, CONCURRENT_REQUESTS, stats) + query_items(cont, REQUEST_EXCLUDED_LOCATIONS, CONCURRENT_QUERIES, stats) + except Exception as e: + client_logger.info("Exception in application layer") + client_logger.error(e) + finally: + if reporter: + reporter.stop() if __name__ == "__main__": diff --git a/sdk/cosmos/azure-cosmos/tests/workloads/r_w_q_with_incorrect_client_workload.py b/sdk/cosmos/azure-cosmos/tests/workloads/r_w_q_with_incorrect_client_workload.py index 0b585a621c1a..428ea2247dc0 100644 --- a/sdk/cosmos/azure-cosmos/tests/workloads/r_w_q_with_incorrect_client_workload.py +++ b/sdk/cosmos/azure-cosmos/tests/workloads/r_w_q_with_incorrect_client_workload.py @@ -4,29 +4,43 @@ from azure.cosmos import documents from workload_utils import * from workload_configs import * +from perf_config import get_perf_config +from perf_stats import Stats +from perf_reporter import PerfReporter from azure.cosmos.aio import CosmosClient as AsyncClient import asyncio async def run_workload(client_id, client_logger): - connectionPolicy = documents.ConnectionPolicy() - connectionPolicy.UseMultipleWriteLocations = USE_MULTIPLE_WRITABLE_LOCATIONS - client = AsyncClient(COSMOS_URI, COSMOS_CREDENTIAL, connection_policy=connectionPolicy, - preferred_locations=PREFERRED_LOCATIONS, excluded_locations=CLIENT_EXCLUDED_LOCATIONS, - enable_diagnostics_logging=True, logger=client_logger, - user_agent=get_user_agent(client_id)) - db = client.get_database_client(COSMOS_DATABASE) - cont = db.get_container_client(COSMOS_CONTAINER) - await asyncio.sleep(1) + stats = Stats() + perf_config = get_perf_config() + reporter = None + if perf_config["enabled"] and perf_config["results_endpoint"]: + reporter = PerfReporter(stats, perf_config) + reporter.start() - while True: - try: - await upsert_item_concurrently(cont, REQUEST_EXCLUDED_LOCATIONS, CONCURRENT_REQUESTS) - await read_item_concurrently(cont, REQUEST_EXCLUDED_LOCATIONS, CONCURRENT_REQUESTS) - await query_items_concurrently(cont, REQUEST_EXCLUDED_LOCATIONS, CONCURRENT_QUERIES) - except Exception as e: - client_logger.info("Exception in application layer") - client_logger.error(e) + try: + connectionPolicy = documents.ConnectionPolicy() + connectionPolicy.UseMultipleWriteLocations = USE_MULTIPLE_WRITABLE_LOCATIONS + client = AsyncClient(COSMOS_URI, COSMOS_CREDENTIAL, connection_policy=connectionPolicy, + preferred_locations=PREFERRED_LOCATIONS, excluded_locations=CLIENT_EXCLUDED_LOCATIONS, + enable_diagnostics_logging=True, logger=client_logger, + user_agent=get_user_agent(client_id)) + db = client.get_database_client(COSMOS_DATABASE) + cont = db.get_container_client(COSMOS_CONTAINER) + await asyncio.sleep(1) + + while True: + try: + await upsert_item_concurrently(cont, REQUEST_EXCLUDED_LOCATIONS, CONCURRENT_REQUESTS, stats) + await read_item_concurrently(cont, REQUEST_EXCLUDED_LOCATIONS, CONCURRENT_REQUESTS, stats) + await query_items_concurrently(cont, REQUEST_EXCLUDED_LOCATIONS, CONCURRENT_QUERIES, stats) + except Exception as e: + client_logger.info("Exception in application layer") + client_logger.error(e) + finally: + if reporter: + reporter.stop() if __name__ == "__main__": diff --git a/sdk/cosmos/azure-cosmos/tests/workloads/r_w_q_workload.py b/sdk/cosmos/azure-cosmos/tests/workloads/r_w_q_workload.py index 89e35937fbfc..d25d934676db 100644 --- a/sdk/cosmos/azure-cosmos/tests/workloads/r_w_q_workload.py +++ b/sdk/cosmos/azure-cosmos/tests/workloads/r_w_q_workload.py @@ -4,6 +4,9 @@ from workload_utils import * from workload_configs import * +from perf_config import get_perf_config +from perf_stats import Stats +from perf_reporter import PerfReporter sys.path.append(r"/") from azure.cosmos.aio import CosmosClient as AsyncClient @@ -11,24 +14,35 @@ async def run_workload(client_id, client_logger): - async with AsyncClient(COSMOS_URI, COSMOS_CREDENTIAL, - preferred_locations=PREFERRED_LOCATIONS, - excluded_locations=CLIENT_EXCLUDED_LOCATIONS, - enable_diagnostics_logging=True, - logger=client_logger, - user_agent=get_user_agent(client_id)) as client: - db = client.get_database_client(COSMOS_DATABASE) - cont = db.get_container_client(COSMOS_CONTAINER) - await asyncio.sleep(1) - - while True: - try: - await upsert_item_concurrently(cont, REQUEST_EXCLUDED_LOCATIONS, CONCURRENT_REQUESTS) - await read_item_concurrently(cont, REQUEST_EXCLUDED_LOCATIONS, CONCURRENT_REQUESTS) - await query_items_concurrently(cont, REQUEST_EXCLUDED_LOCATIONS, CONCURRENT_QUERIES) - except Exception as e: - client_logger.info("Exception in application layer") - client_logger.error(e) + stats = Stats() + perf_config = get_perf_config() + reporter = None + if perf_config["enabled"] and perf_config["results_endpoint"]: + reporter = PerfReporter(stats, perf_config) + reporter.start() + + try: + async with AsyncClient(COSMOS_URI, COSMOS_CREDENTIAL, + preferred_locations=PREFERRED_LOCATIONS, + excluded_locations=CLIENT_EXCLUDED_LOCATIONS, + enable_diagnostics_logging=True, + logger=client_logger, + user_agent=get_user_agent(client_id)) as client: + db = client.get_database_client(COSMOS_DATABASE) + cont = db.get_container_client(COSMOS_CONTAINER) + await asyncio.sleep(1) + + while True: + try: + await upsert_item_concurrently(cont, REQUEST_EXCLUDED_LOCATIONS, CONCURRENT_REQUESTS, stats) + await read_item_concurrently(cont, REQUEST_EXCLUDED_LOCATIONS, CONCURRENT_REQUESTS, stats) + await query_items_concurrently(cont, REQUEST_EXCLUDED_LOCATIONS, CONCURRENT_QUERIES, stats) + except Exception as e: + client_logger.info("Exception in application layer") + client_logger.error(e) + finally: + if reporter: + reporter.stop() if __name__ == "__main__": diff --git a/sdk/cosmos/azure-cosmos/tests/workloads/r_workload.py b/sdk/cosmos/azure-cosmos/tests/workloads/r_workload.py index b412c683419d..cf7e78692f6a 100644 --- a/sdk/cosmos/azure-cosmos/tests/workloads/r_workload.py +++ b/sdk/cosmos/azure-cosmos/tests/workloads/r_workload.py @@ -4,28 +4,42 @@ from azure.cosmos import documents from workload_utils import * from workload_configs import * +from perf_config import get_perf_config +from perf_stats import Stats +from perf_reporter import PerfReporter from azure.cosmos.aio import CosmosClient as AsyncClient import asyncio async def run_workload(client_id, client_logger): - connectionPolicy = documents.ConnectionPolicy() - connectionPolicy.UseMultipleWriteLocations = USE_MULTIPLE_WRITABLE_LOCATIONS - async with AsyncClient(COSMOS_URI, COSMOS_CREDENTIAL, connection_policy=connectionPolicy, - preferred_locations=PREFERRED_LOCATIONS, excluded_locations=CLIENT_EXCLUDED_LOCATIONS, - enable_diagnostics_logging=True, logger=client_logger, - user_agent=get_user_agent(client_id)) as client: - db = client.get_database_client(COSMOS_DATABASE) - cont = db.get_container_client(COSMOS_CONTAINER) - await asyncio.sleep(1) + stats = Stats() + perf_config = get_perf_config() + reporter = None + if perf_config["enabled"] and perf_config["results_endpoint"]: + reporter = PerfReporter(stats, perf_config) + reporter.start() - while True: - try: - await read_item_concurrently(cont, REQUEST_EXCLUDED_LOCATIONS, CONCURRENT_REQUESTS) - await query_items_concurrently(cont, REQUEST_EXCLUDED_LOCATIONS, CONCURRENT_QUERIES) - except Exception as e: - client_logger.info("Exception in application layer") - client_logger.error(e) + try: + connectionPolicy = documents.ConnectionPolicy() + connectionPolicy.UseMultipleWriteLocations = USE_MULTIPLE_WRITABLE_LOCATIONS + async with AsyncClient(COSMOS_URI, COSMOS_CREDENTIAL, connection_policy=connectionPolicy, + preferred_locations=PREFERRED_LOCATIONS, excluded_locations=CLIENT_EXCLUDED_LOCATIONS, + enable_diagnostics_logging=True, logger=client_logger, + user_agent=get_user_agent(client_id)) as client: + db = client.get_database_client(COSMOS_DATABASE) + cont = db.get_container_client(COSMOS_CONTAINER) + await asyncio.sleep(1) + + while True: + try: + await read_item_concurrently(cont, REQUEST_EXCLUDED_LOCATIONS, CONCURRENT_REQUESTS, stats) + await query_items_concurrently(cont, REQUEST_EXCLUDED_LOCATIONS, CONCURRENT_QUERIES, stats) + except Exception as e: + client_logger.info("Exception in application layer") + client_logger.error(e) + finally: + if reporter: + reporter.stop() if __name__ == "__main__": diff --git a/sdk/cosmos/azure-cosmos/tests/workloads/w_proxy_workload.py b/sdk/cosmos/azure-cosmos/tests/workloads/w_proxy_workload.py index eea17899360a..ced330bb3545 100644 --- a/sdk/cosmos/azure-cosmos/tests/workloads/w_proxy_workload.py +++ b/sdk/cosmos/azure-cosmos/tests/workloads/w_proxy_workload.py @@ -3,33 +3,47 @@ from workload_utils import * from workload_configs import * +from perf_config import get_perf_config +from perf_stats import Stats +from perf_reporter import PerfReporter from azure.cosmos.aio import CosmosClient as AsyncClient from azure.core.pipeline.transport import AioHttpTransport import asyncio async def run_workload(client_id, client_logger): - session = create_custom_session() - async with AsyncClient(COSMOS_URI, - COSMOS_CREDENTIAL, - multiple_write_locations=USE_MULTIPLE_WRITABLE_LOCATIONS, - preferred_locations=PREFERRED_LOCATIONS, - transport=AioHttpTransport(session=session, session_owner=False), - excluded_locations=CLIENT_EXCLUDED_LOCATIONS, - enable_diagnostics_logging=True, - logger=client_logger, - user_agent=get_user_agent(client_id) - ) as client: - db = client.get_database_client(COSMOS_DATABASE) - cont = db.get_container_client(COSMOS_CONTAINER) - await asyncio.sleep(1) + stats = Stats() + perf_config = get_perf_config() + reporter = None + if perf_config["enabled"] and perf_config["results_endpoint"]: + reporter = PerfReporter(stats, perf_config) + reporter.start() - while True: - try: - await upsert_item_concurrently(cont, REQUEST_EXCLUDED_LOCATIONS, CONCURRENT_REQUESTS) - except Exception as e: - client_logger.info("Exception in application layer") - client_logger.error(e) + try: + session = create_custom_session() + async with AsyncClient(COSMOS_URI, + COSMOS_CREDENTIAL, + multiple_write_locations=USE_MULTIPLE_WRITABLE_LOCATIONS, + preferred_locations=PREFERRED_LOCATIONS, + transport=AioHttpTransport(session=session, session_owner=False), + excluded_locations=CLIENT_EXCLUDED_LOCATIONS, + enable_diagnostics_logging=True, + logger=client_logger, + user_agent=get_user_agent(client_id) + ) as client: + db = client.get_database_client(COSMOS_DATABASE) + cont = db.get_container_client(COSMOS_CONTAINER) + await asyncio.sleep(1) + + while True: + try: + await upsert_item_concurrently(cont, REQUEST_EXCLUDED_LOCATIONS, CONCURRENT_REQUESTS, stats) + except Exception as e: + client_logger.info("Exception in application layer") + client_logger.error(e) + finally: + if reporter: + reporter.stop() if __name__ == "__main__": diff --git a/sdk/cosmos/azure-cosmos/tests/workloads/w_workload.py b/sdk/cosmos/azure-cosmos/tests/workloads/w_workload.py index d40ed4dc1ea9..5b77cc0023ee 100644 --- a/sdk/cosmos/azure-cosmos/tests/workloads/w_workload.py +++ b/sdk/cosmos/azure-cosmos/tests/workloads/w_workload.py @@ -5,28 +5,42 @@ from azure.cosmos import documents from workload_utils import * from workload_configs import * +from perf_config import get_perf_config +from perf_stats import Stats +from perf_reporter import PerfReporter sys.path.append(r"/") from azure.cosmos.aio import CosmosClient as AsyncClient import asyncio async def run_workload(client_id, client_logger): - connectionPolicy = documents.ConnectionPolicy() - connectionPolicy.UseMultipleWriteLocations = USE_MULTIPLE_WRITABLE_LOCATIONS - async with AsyncClient(COSMOS_URI, COSMOS_CREDENTIAL, connection_policy=connectionPolicy, - preferred_locations=PREFERRED_LOCATIONS, excluded_locations=CLIENT_EXCLUDED_LOCATIONS, - enable_diagnostics_logging=True, logger=client_logger, - user_agent=get_user_agent(client_id)) as client: - db = client.get_database_client(COSMOS_DATABASE) - cont = db.get_container_client(COSMOS_CONTAINER) - await asyncio.sleep(1) + stats = Stats() + perf_config = get_perf_config() + reporter = None + if perf_config["enabled"] and perf_config["results_endpoint"]: + reporter = PerfReporter(stats, perf_config) + reporter.start() - while True: - try: - await upsert_item_concurrently(cont, REQUEST_EXCLUDED_LOCATIONS, CONCURRENT_REQUESTS) - except Exception as e: - client_logger.info("Exception in application layer") - client_logger.error(e) + try: + connectionPolicy = documents.ConnectionPolicy() + connectionPolicy.UseMultipleWriteLocations = USE_MULTIPLE_WRITABLE_LOCATIONS + async with AsyncClient(COSMOS_URI, COSMOS_CREDENTIAL, connection_policy=connectionPolicy, + preferred_locations=PREFERRED_LOCATIONS, excluded_locations=CLIENT_EXCLUDED_LOCATIONS, + enable_diagnostics_logging=True, logger=client_logger, + user_agent=get_user_agent(client_id)) as client: + db = client.get_database_client(COSMOS_DATABASE) + cont = db.get_container_client(COSMOS_CONTAINER) + await asyncio.sleep(1) + + while True: + try: + await upsert_item_concurrently(cont, REQUEST_EXCLUDED_LOCATIONS, CONCURRENT_REQUESTS, stats) + except Exception as e: + client_logger.info("Exception in application layer") + client_logger.error(e) + finally: + if reporter: + reporter.stop() if __name__ == "__main__": diff --git a/sdk/cosmos/azure-cosmos/tests/workloads/workload_configs.py b/sdk/cosmos/azure-cosmos/tests/workloads/workload_configs.py index 340577e1aa03..a1ca276fb10d 100644 --- a/sdk/cosmos/azure-cosmos/tests/workloads/workload_configs.py +++ b/sdk/cosmos/azure-cosmos/tests/workloads/workload_configs.py @@ -1,25 +1,27 @@ # The MIT License (MIT) # Copyright (c) Microsoft Corporation. All rights reserved. -# Replace with your Cosmos DB details +# All configuration is driven by environment variables with sensible defaults. import logging +import os + from azure.identity import DefaultAzureCredential -PREFERRED_LOCATIONS = [] -CLIENT_EXCLUDED_LOCATIONS = [] -REQUEST_EXCLUDED_LOCATIONS = [] -COSMOS_PROXY_URI = "0.0.0.0" -COSMOS_URI = "" -COSMOS_KEY = "" +PREFERRED_LOCATIONS = os.environ.get("COSMOS_PREFERRED_LOCATIONS", "").split(",") if os.environ.get("COSMOS_PREFERRED_LOCATIONS") else [] +CLIENT_EXCLUDED_LOCATIONS = os.environ.get("COSMOS_CLIENT_EXCLUDED_LOCATIONS", "").split(",") if os.environ.get("COSMOS_CLIENT_EXCLUDED_LOCATIONS") else [] +REQUEST_EXCLUDED_LOCATIONS = os.environ.get("COSMOS_REQUEST_EXCLUDED_LOCATIONS", "").split(",") if os.environ.get("COSMOS_REQUEST_EXCLUDED_LOCATIONS") else [] +COSMOS_PROXY_URI = os.environ.get("COSMOS_PROXY_URI", "0.0.0.0") +COSMOS_URI = os.environ.get("COSMOS_URI", "") +COSMOS_KEY = os.environ.get("COSMOS_KEY", "") COSMOS_CREDENTIAL = COSMOS_KEY if COSMOS_KEY else DefaultAzureCredential() -COSMOS_CONTAINER = "scale_cont" -COSMOS_DATABASE = "scale_db" -USER_AGENT_PREFIX = "" -LOG_LEVEL = logging.DEBUG -APP_INSIGHTS_CONNECTION_STRING = "" -CIRCUIT_BREAKER_ENABLED = False -USE_MULTIPLE_WRITABLE_LOCATIONS = False -CONCURRENT_REQUESTS = 100 -CONCURRENT_QUERIES = 2 -PARTITION_KEY = "id" # id or pk -NUMBER_OF_LOGICAL_PARTITIONS = 10000 -THROUGHPUT = 1000000 +COSMOS_CONTAINER = os.environ.get("COSMOS_CONTAINER", "scale_cont") +COSMOS_DATABASE = os.environ.get("COSMOS_DATABASE", "scale_db") +USER_AGENT_PREFIX = os.environ.get("COSMOS_USER_AGENT_PREFIX", "") +LOG_LEVEL = getattr(logging, os.environ.get("COSMOS_LOG_LEVEL", "DEBUG"), logging.DEBUG) +APP_INSIGHTS_CONNECTION_STRING = os.environ.get("APP_INSIGHTS_CONNECTION_STRING", "") +CIRCUIT_BREAKER_ENABLED = os.environ.get("AZURE_COSMOS_ENABLE_CIRCUIT_BREAKER", "false").lower() == "true" +USE_MULTIPLE_WRITABLE_LOCATIONS = os.environ.get("COSMOS_USE_MULTIPLE_WRITABLE_LOCATIONS", "false").lower() == "true" +CONCURRENT_REQUESTS = int(os.environ.get("COSMOS_CONCURRENT_REQUESTS", "100")) +CONCURRENT_QUERIES = int(os.environ.get("COSMOS_CONCURRENT_QUERIES", "2")) +PARTITION_KEY = os.environ.get("COSMOS_PARTITION_KEY", "id") +NUMBER_OF_LOGICAL_PARTITIONS = int(os.environ.get("COSMOS_NUMBER_OF_LOGICAL_PARTITIONS", "10000")) +THROUGHPUT = int(os.environ.get("COSMOS_THROUGHPUT", "1000000")) diff --git a/sdk/cosmos/azure-cosmos/tests/workloads/workload_utils.py b/sdk/cosmos/azure-cosmos/tests/workloads/workload_utils.py index fe3d69b3bfbe..2697ae69d808 100644 --- a/sdk/cosmos/azure-cosmos/tests/workloads/workload_utils.py +++ b/sdk/cosmos/azure-cosmos/tests/workloads/workload_utils.py @@ -4,11 +4,14 @@ import os import random import sys +import time +import traceback import uuid from datetime import datetime from logging.handlers import RotatingFileHandler from aiohttp import ClientSession from azure.monitor.opentelemetry import configure_azure_monitor +from azure.cosmos.exceptions import CosmosHttpResponseError from custom_tcp_connector import ProxiedTCPConnector from workload_configs import * @@ -53,73 +56,159 @@ def _get_upsert_item(): # 10 percent of the time, create a new item instead of updating an existing one return create_random_item() if random.random() < 0.1 else get_existing_random_item() -def upsert_item(container, excluded_locations, num_upserts): +def _record_error(stats, operation, error): + """Extract Cosmos status codes and record the error in stats.""" + status_code = sub_status_code = None + if isinstance(error, CosmosHttpResponseError): + status_code = error.status_code + sub_status_code = getattr(error, 'sub_status', None) + stats.record_error(operation, str(error), traceback.format_exc(), + status_code, sub_status_code) + + +def upsert_item(container, excluded_locations, num_upserts, stats=None): item = _get_upsert_item() for _ in range(num_upserts): - if excluded_locations: - container.upsert_item(item, etag=None, match_condition=None, - excluded_locations=excluded_locations) - else: - container.upsert_item(item, etag=None, match_condition=None) + start = time.perf_counter() + try: + if excluded_locations: + container.upsert_item(item, etag=None, match_condition=None, + excluded_locations=excluded_locations) + else: + container.upsert_item(item, etag=None, match_condition=None) + if stats: + stats.record("UpsertItem", (time.perf_counter() - start) * 1000) + except Exception as e: + if stats: + _record_error(stats, "UpsertItem", e) + raise -def read_item(container, excluded_locations, num_reads): +def read_item(container, excluded_locations, num_reads, stats=None): for _ in range(num_reads): item = get_existing_random_item() - if excluded_locations: - container.read_item(item["id"], item[PARTITION_KEY], etag=None, match_condition=None, - excluded_locations=excluded_locations) - else: - container.read_item(item["id"], item[PARTITION_KEY], etag=None, match_condition=None) + start = time.perf_counter() + try: + if excluded_locations: + container.read_item(item["id"], item[PARTITION_KEY], etag=None, match_condition=None, + excluded_locations=excluded_locations) + else: + container.read_item(item["id"], item[PARTITION_KEY], etag=None, match_condition=None) + if stats: + stats.record("ReadItem", (time.perf_counter() - start) * 1000) + except Exception as e: + if stats: + _record_error(stats, "ReadItem", e) + raise -def query_items(container, excluded_locations, num_queries): +def query_items(container, excluded_locations, num_queries, stats=None): for _ in range(num_queries): - perform_query(container, excluded_locations) + perform_query(container, excluded_locations, stats) -def perform_query(container, excluded_locations): +def perform_query(container, excluded_locations, stats=None): random_item = get_existing_random_item() - if excluded_locations: - results = container.query_items(query="SELECT * FROM c where c.id=@id and c.pk=@pk", - parameters=[{"name": "@id", "value": random_item["id"]}, - {"name": "@pk", "value": random_item["pk"]}], - partition_key=random_item[PARTITION_KEY], + start = time.perf_counter() + try: + if excluded_locations: + results = container.query_items(query="SELECT * FROM c where c.id=@id and c.pk=@pk", + parameters=[{"name": "@id", "value": random_item["id"]}, + {"name": "@pk", "value": random_item["pk"]}], + partition_key=random_item[PARTITION_KEY], + excluded_locations=excluded_locations) + else: + results = container.query_items(query="SELECT * FROM c where c.id=@id and c.pk=@pk", + parameters=[{"name": "@id", "value": random_item["id"]}, + {"name": "@pk", "value": random_item["pk"]}], + partition_key=random_item[PARTITION_KEY]) + items = [item for item in results] + if stats: + stats.record("QueryItems", (time.perf_counter() - start) * 1000) + except Exception as e: + if stats: + _record_error(stats, "QueryItems", e) + raise + + +async def _timed_upsert_async(container, item, excluded_locations, stats): + """Single async upsert with timing and error tracking.""" + start = time.perf_counter() + try: + if excluded_locations: + await container.upsert_item(item, etag=None, match_condition=None, excluded_locations=excluded_locations) - else: - results = container.query_items(query="SELECT * FROM c where c.id=@id and c.pk=@pk", - parameters=[{"name": "@id", "value": random_item["id"]}, - {"name": "@pk", "value": random_item["pk"]}], - partition_key=random_item[PARTITION_KEY]) - items = [item for item in results] - -async def upsert_item_concurrently(container, excluded_locations, num_upserts): + else: + await container.upsert_item(item, etag=None, match_condition=None) + if stats: + stats.record("UpsertItem", (time.perf_counter() - start) * 1000) + except Exception as e: + if stats: + _record_error(stats, "UpsertItem", e) + raise + + +async def _timed_read_async(container, item, excluded_locations, stats): + """Single async read with timing and error tracking.""" + start = time.perf_counter() + try: + if excluded_locations: + await container.read_item(item["id"], item[PARTITION_KEY], etag=None, match_condition=None, + excluded_locations=excluded_locations) + else: + await container.read_item(item["id"], item[PARTITION_KEY], etag=None, match_condition=None) + if stats: + stats.record("ReadItem", (time.perf_counter() - start) * 1000) + except Exception as e: + if stats: + _record_error(stats, "ReadItem", e) + raise + + +async def _timed_query_async(container, random_item, excluded_locations, stats): + """Single async query with timing and error tracking.""" + start = time.perf_counter() + try: + if excluded_locations: + results = container.query_items(query="SELECT * FROM c where c.id=@id and c.pk=@pk", + parameters=[{"name": "@id", "value": random_item["id"]}, + {"name": "@pk", "value": random_item["pk"]}], + partition_key=random_item[PARTITION_KEY], + excluded_locations=excluded_locations) + else: + results = container.query_items(query="SELECT * FROM c where c.id=@id and c.pk=@pk", + parameters=[{"name": "@id", "value": random_item["id"]}, + {"name": "@pk", "value": random_item["pk"]}], + partition_key=random_item[PARTITION_KEY]) + items = [item async for item in results] + if stats: + stats.record("QueryItems", (time.perf_counter() - start) * 1000) + except Exception as e: + if stats: + _record_error(stats, "QueryItems", e) + raise + + +async def upsert_item_concurrently(container, excluded_locations, num_upserts, stats=None): tasks = [] for _ in range(num_upserts): item = _get_upsert_item() - if excluded_locations: - tasks.append(container.upsert_item(item, etag=None, match_condition=None, - excluded_locations=excluded_locations)) - else: - tasks.append(container.upsert_item(item, etag=None, match_condition=None)) + tasks.append(_timed_upsert_async(container, item, excluded_locations, stats)) await asyncio.gather(*tasks) -async def read_item_concurrently(container, excluded_locations, num_reads): +async def read_item_concurrently(container, excluded_locations, num_reads, stats=None): tasks = [] for _ in range(num_reads): item = get_existing_random_item() - if excluded_locations: - tasks.append(container.read_item(item["id"], item[PARTITION_KEY], etag=None, match_condition=None, - excluded_locations=excluded_locations)) - else: - tasks.append(container.read_item(item["id"], item[PARTITION_KEY], etag=None, match_condition=None)) + tasks.append(_timed_read_async(container, item, excluded_locations, stats)) await asyncio.gather(*tasks) -async def query_items_concurrently(container, excluded_locations, num_queries): +async def query_items_concurrently(container, excluded_locations, num_queries, stats=None): tasks = [] for _ in range(num_queries): - tasks.append(perform_query_concurrently(container, excluded_locations)) + random_item = get_existing_random_item() + tasks.append(_timed_query_async(container, random_item, excluded_locations, stats)) await asyncio.gather(*tasks) def create_custom_session(): @@ -133,21 +222,6 @@ def create_custom_session(): session = ClientSession(connector=proxied_connector) return session -async def perform_query_concurrently(container, excluded_locations): - random_item = get_existing_random_item() - if excluded_locations: - results = container.query_items(query="SELECT * FROM c where c.id=@id and c.pk=@pk", - parameters=[{"name": "@id", "value": random_item["id"]}, - {"name": "@pk", "value": random_item["pk"]}], - partition_key=random_item[PARTITION_KEY], - excluded_locations=excluded_locations) - else: - results = container.query_items(query="SELECT * FROM c where c.id=@id and c.pk=@pk", - parameters=[{"name": "@id", "value": random_item["id"]}, - {"name": "@pk", "value": random_item["pk"]}], - partition_key=random_item[PARTITION_KEY]) - items = [item async for item in results] - def create_logger(file_name): os.environ["AZURE_COSMOS_ENABLE_CIRCUIT_BREAKER"] = str(CIRCUIT_BREAKER_ENABLED) logger = logging.getLogger() From 55c4fc40d77b3575f01de04b95c1df2a9d16d914 Mon Sep 17 00:00:00 2001 From: tvaron3 Date: Thu, 9 Apr 2026 23:26:40 -0700 Subject: [PATCH 03/15] fix(workloads): require psutil, remove /proc fallback code MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit psutil is now a hard import (not optional). Removed all /proc/meminfo and /proc/self/status fallback parsing — if psutil is not installed, the import will fail immediately rather than silently degrading. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../tests/workloads/perf_reporter.py | 78 +++++-------------- 1 file changed, 21 insertions(+), 57 deletions(-) diff --git a/sdk/cosmos/azure-cosmos/tests/workloads/perf_reporter.py b/sdk/cosmos/azure-cosmos/tests/workloads/perf_reporter.py index c3d77c8f0646..6e1b2606a3dd 100644 --- a/sdk/cosmos/azure-cosmos/tests/workloads/perf_reporter.py +++ b/sdk/cosmos/azure-cosmos/tests/workloads/perf_reporter.py @@ -9,12 +9,9 @@ import uuid from datetime import datetime, timezone -from perf_stats import Stats +import psutil -try: - import psutil -except ImportError: - psutil = None +from perf_stats import Stats logger = logging.getLogger(__name__) @@ -28,66 +25,37 @@ def _get_sdk_version() -> str: return "unknown" -def _get_cpu_percent(process=None) -> float: +def _get_cpu_percent(process) -> float: """Get current process CPU percent.""" - if psutil and process: - try: - return process.cpu_percent(interval=None) - except Exception: - pass - return 0.0 + try: + return process.cpu_percent(interval=None) + except Exception: + return 0.0 -def _get_memory_bytes(process=None) -> int: +def _get_memory_bytes(process) -> int: """Get current process RSS in bytes.""" - if psutil and process: - try: - return process.memory_info().rss - except Exception: - pass - # Fallback: parse /proc on Linux try: - with open("/proc/self/status", "r") as f: - for line in f: - if line.startswith("VmRSS:"): - return int(line.split()[1]) * 1024 # kB to bytes + return process.memory_info().rss except Exception: - pass - return 0 + return 0 def _get_system_cpu_percent() -> float: """Get system-wide CPU percent.""" - if psutil: - try: - return psutil.cpu_percent(interval=None) - except Exception: - pass - return 0.0 + try: + return psutil.cpu_percent(interval=None) + except Exception: + return 0.0 -def _get_system_memory() -> tuple[int, int]: +def _get_system_memory() -> tuple: """Get system total and used memory in bytes.""" - if psutil: - try: - mem = psutil.virtual_memory() - return mem.total, mem.used - except Exception: - pass - # Fallback: parse /proc/meminfo try: - info = {} - with open("/proc/meminfo", "r") as f: - for line in f: - parts = line.split() - if len(parts) >= 2: - info[parts[0].rstrip(":")] = int(parts[1]) * 1024 - total = info.get("MemTotal", 0) - available = info.get("MemAvailable", 0) - return total, total - available + mem = psutil.virtual_memory() + return mem.total, mem.used except Exception: - pass - return 0, 0 + return 0, 0 class PerfReporter: @@ -103,12 +71,12 @@ def __init__(self, stats: Stats, config: dict): self._stats = stats self._config = config self._stop_event = threading.Event() - self._thread: threading.Thread | None = None + self._thread = None self._client = None self._container = None self._hostname = socket.gethostname() self._sdk_version = _get_sdk_version() - self._process = psutil.Process() if psutil else None + self._process = psutil.Process() def start(self): """Start the background reporting thread (daemon).""" @@ -128,7 +96,6 @@ def stop(self): self._flush() except Exception as e: logger.warning("PerfReporter final flush failed: %s", e) - # Close the CosmosClient to release connection pools if self._client: try: self._client.close() @@ -159,7 +126,7 @@ def _ensure_container(self): if self._container is not None: return - from azure.cosmos import CosmosClient, PartitionKey + from azure.cosmos import CosmosClient from azure.identity import DefaultAzureCredential endpoint = self._config["results_endpoint"] @@ -182,13 +149,11 @@ def _flush(self): sys_cpu = _get_system_cpu_percent() sys_total, sys_used = _get_system_memory() - # Import workload configs for config snapshot concurrency = _safe_int_env("COSMOS_CONCURRENT_REQUESTS", 100) preferred = os.environ.get("COSMOS_PREFERRED_LOCATIONS", "") excluded = os.environ.get("COSMOS_CLIENT_EXCLUDED_LOCATIONS", "") ppcb = os.environ.get("AZURE_COSMOS_ENABLE_CIRCUIT_BREAKER", "false").lower() == "true" - # Atomically drain both summaries and errors summaries, errors = self._stats.drain_all() for s in summaries: doc = { @@ -224,7 +189,6 @@ def _flush(self): except Exception as e: logger.warning("PerfReporter upsert failed for %s: %s", s["operation"], e) - # Upsert error documents for err in errors: doc = { "id": str(uuid.uuid4()), From 705c8560a4977b9de0760ddca001c2154527ff0f Mon Sep 17 00:00:00 2001 From: tvaron3 Date: Fri, 10 Apr 2026 09:16:37 -0700 Subject: [PATCH 04/15] refactor(workloads): unify to single workload.py with env-var config - Single workload.py replaces 6 operation-specific files - WORKLOAD_OPERATIONS env var controls which ops run (read,write,query) - WORKLOAD_USE_PROXY env var enables Envoy proxy routing - WORKLOAD_USE_SYNC env var enables sync client - Validate operation names at import time with clear error - Replace manual sorted-list percentiles with hdrhistogram (O(1) record/query) - Fixed memory usage (~40KB per histogram vs unbounded list growth) Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../tests/workloads/perf_stats.py | 68 +++++----- .../tests/workloads/run_workloads.sh | 21 +-- .../azure-cosmos/tests/workloads/workload.py | 126 ++++++++++++++++++ .../tests/workloads/workload_configs.py | 13 ++ 4 files changed, 182 insertions(+), 46 deletions(-) create mode 100644 sdk/cosmos/azure-cosmos/tests/workloads/workload.py diff --git a/sdk/cosmos/azure-cosmos/tests/workloads/perf_stats.py b/sdk/cosmos/azure-cosmos/tests/workloads/perf_stats.py index f5b95321eeca..6f8d05ae38fd 100644 --- a/sdk/cosmos/azure-cosmos/tests/workloads/perf_stats.py +++ b/sdk/cosmos/azure-cosmos/tests/workloads/perf_stats.py @@ -1,31 +1,42 @@ # The MIT License (MIT) # Copyright (c) Microsoft Corporation. All rights reserved. -"""Thread-safe per-operation latency histogram and error tracking.""" +"""Thread-safe per-operation latency histogram and error tracking using HdrHistogram.""" import threading import time +try: + from hdrhistogram import HdrHistogram +except ImportError: + raise ImportError( + "hdrhistogram is required for perf_stats. " + "Install it with: pip install hdrhistogram" + ) + class Stats: - """Thread-safe per-operation latency and error tracking. + """Thread-safe per-operation latency and error tracking using HdrHistogram. - Uses a sorted-list percentile calculation to avoid native dependencies. - A background reporter drains accumulated data every reporting interval. + Uses HdrHistogram for O(1) record/query with fixed ~40KB memory per histogram, + replacing the previous sorted-list approach that grew unbounded. + Values are stored in microseconds internally for sub-ms precision. """ def __init__(self): self._lock = threading.Lock() - self._latencies: dict[str, list[float]] = {} + # min 1 microsecond, max 60 seconds (in microseconds), 3 significant digits + self._histograms: dict[str, HdrHistogram] = {} self._error_counts: dict[str, int] = {} self._errors: list[dict] = [] def record(self, operation: str, duration_ms: float): """Record a successful operation with its duration in milliseconds.""" with self._lock: - if operation not in self._latencies: - self._latencies[operation] = [] + if operation not in self._histograms: + self._histograms[operation] = HdrHistogram(1, 60_000_000, 3) self._error_counts[operation] = 0 - self._latencies[operation].append(duration_ms) + # Record in microseconds for sub-ms precision + self._histograms[operation].record_value(max(1, int(duration_ms * 1000))) def record_error(self, operation: str, error_msg: str, traceback_str: str, status_code: int = None, sub_status_code: int = None): @@ -33,7 +44,7 @@ def record_error(self, operation: str, error_msg: str, traceback_str: str, with self._lock: if operation not in self._error_counts: self._error_counts[operation] = 0 - self._latencies[operation] = [] + self._histograms[operation] = HdrHistogram(1, 60_000_000, 3) self._error_counts[operation] += 1 self._errors.append({ "operation": operation, @@ -54,26 +65,24 @@ def drain_all(self) -> tuple[list[dict], list[dict]]: """ with self._lock: summaries = [] - all_ops = set(self._latencies.keys()) | set(self._error_counts.keys()) + all_ops = set(list(self._histograms.keys()) + list(self._error_counts.keys())) for op in sorted(all_ops): - latencies = self._latencies.get(op, []) + hist = self._histograms.get(op) errors = self._error_counts.get(op, 0) - count = len(latencies) + count = hist.total_count if hist else 0 if count == 0 and errors == 0: continue if count > 0: - latencies.sort() - total = sum(latencies) summaries.append({ "operation": op, "count": count, "errors": errors, - "min_ms": latencies[0], - "max_ms": latencies[-1], - "mean_ms": total / count, - "p50_ms": _percentile(latencies, 50.0), - "p90_ms": _percentile(latencies, 90.0), - "p99_ms": _percentile(latencies, 99.0), + "min_ms": hist.min_value / 1000.0, + "max_ms": hist.max_value / 1000.0, + "mean_ms": hist.mean_value / 1000.0, + "p50_ms": hist.get_value_at_percentile(50.0) / 1000.0, + "p90_ms": hist.get_value_at_percentile(90.0) / 1000.0, + "p99_ms": hist.get_value_at_percentile(99.0) / 1000.0, }) else: summaries.append({ @@ -87,7 +96,8 @@ def drain_all(self) -> tuple[list[dict], list[dict]]: "p90_ms": 0.0, "p99_ms": 0.0, }) - self._latencies.clear() + # Reset for next interval + self._histograms.clear() self._error_counts.clear() error_details = self._errors self._errors = [] @@ -102,19 +112,3 @@ def drain_errors(self) -> list[dict]: """Drain accumulated error details.""" _, errors = self.drain_all() return errors - - -def _percentile(sorted_data: list[float], pct: float) -> float: - """Calculate percentile from pre-sorted data using nearest-rank method.""" - n = len(sorted_data) - if n == 0: - return 0.0 - if n == 1: - return sorted_data[0] - rank = (pct / 100.0) * (n - 1) - lower = int(rank) - upper = lower + 1 - if upper >= n: - return sorted_data[-1] - fraction = rank - lower - return sorted_data[lower] + fraction * (sorted_data[upper] - sorted_data[lower]) diff --git a/sdk/cosmos/azure-cosmos/tests/workloads/run_workloads.sh b/sdk/cosmos/azure-cosmos/tests/workloads/run_workloads.sh index 9d3f6ecbf62f..836507c4a25d 100755 --- a/sdk/cosmos/azure-cosmos/tests/workloads/run_workloads.sh +++ b/sdk/cosmos/azure-cosmos/tests/workloads/run_workloads.sh @@ -1,10 +1,15 @@ #!/bin/bash if [ $# -eq 0 ]; then - echo "Usage: $0 num_runs" + echo "Usage: $0 num_runs [operations] [proxy]" + echo " num_runs: number of processes per config" + echo " operations: comma-separated (default: read,write,query)" + echo " proxy: true/false (default: false)" exit 1 fi num_runs=$1 +operations=${2:-read,write,query} +use_proxy=${3:-false} echo "[Info] Installing azure-cosmos package..." pip install ../../. @@ -14,13 +19,11 @@ if [ $? -ne 0 ]; then fi echo "[Info] azure-cosmos installed successfully." -# Loop over each Python file in the current directory ending with _workload.py -for file in ./*_workload.py; do - for (( i=0; i /dev/null 2>&1 & + sleep 1 done -echo "[Info] All workloads started successfully." +echo "[Info] All workloads started." diff --git a/sdk/cosmos/azure-cosmos/tests/workloads/workload.py b/sdk/cosmos/azure-cosmos/tests/workloads/workload.py new file mode 100644 index 000000000000..731cb77a38b8 --- /dev/null +++ b/sdk/cosmos/azure-cosmos/tests/workloads/workload.py @@ -0,0 +1,126 @@ +#!/usr/bin/env python3 +# The MIT License (MIT) +# Copyright (c) Microsoft Corporation. All rights reserved. +"""Unified Cosmos DB workload — operations, proxy, and sync/async controlled by env vars. + +Environment variables: + WORKLOAD_OPERATIONS comma-separated list of operations (default: read,write,query) + WORKLOAD_USE_PROXY route through Envoy proxy (default: false) + WORKLOAD_USE_SYNC use sync client instead of async (default: false) +""" + +import os +import asyncio +import time + +from azure.cosmos.aio import CosmosClient as AsyncClient +from azure.cosmos import CosmosClient as SyncClient, documents +from azure.core.pipeline.transport._aiohttp import AioHttpTransport + +from workload_utils import * +from workload_configs import * +from perf_config import get_perf_config +from perf_stats import Stats +from perf_reporter import PerfReporter + + +async def run_workload_async(client_id, client_logger): + """Async workload loop — default mode.""" + ops = WORKLOAD_OPERATIONS + use_proxy = WORKLOAD_USE_PROXY + + stats = Stats() + perf_config = get_perf_config() + reporter = None + if perf_config["enabled"] and perf_config["results_endpoint"]: + reporter = PerfReporter(stats, perf_config) + reporter.start() + + try: + transport = None + if use_proxy: + session = create_custom_session() + transport = AioHttpTransport(session=session, session_owner=False) + + client_kwargs = dict( + preferred_locations=PREFERRED_LOCATIONS, + excluded_locations=CLIENT_EXCLUDED_LOCATIONS, + enable_diagnostics_logging=True, + logger=client_logger, + user_agent=get_user_agent(client_id), + ) + if use_proxy and transport: + client_kwargs["transport"] = transport + if USE_MULTIPLE_WRITABLE_LOCATIONS: + client_kwargs["multiple_write_locations"] = True + + async with AsyncClient(COSMOS_URI, COSMOS_CREDENTIAL, **client_kwargs) as client: + db = client.get_database_client(COSMOS_DATABASE) + cont = db.get_container_client(COSMOS_CONTAINER) + await asyncio.sleep(1) + + while True: + try: + if "write" in ops: + await upsert_item_concurrently(cont, REQUEST_EXCLUDED_LOCATIONS, CONCURRENT_REQUESTS, stats) + if "read" in ops: + await read_item_concurrently(cont, REQUEST_EXCLUDED_LOCATIONS, CONCURRENT_REQUESTS, stats) + if "query" in ops: + await query_items_concurrently(cont, REQUEST_EXCLUDED_LOCATIONS, CONCURRENT_QUERIES, stats) + except Exception as e: + client_logger.info("Exception in application layer") + client_logger.error(e) + finally: + if reporter: + reporter.stop() + + +def run_workload_sync(client_id, client_logger): + """Sync workload loop — used when WORKLOAD_USE_SYNC=true.""" + ops = WORKLOAD_OPERATIONS + + stats = Stats() + perf_config = get_perf_config() + reporter = None + if perf_config["enabled"] and perf_config["results_endpoint"]: + reporter = PerfReporter(stats, perf_config) + reporter.start() + + try: + connection_policy = documents.ConnectionPolicy() + connection_policy.UseMultipleWriteLocations = USE_MULTIPLE_WRITABLE_LOCATIONS + + with SyncClient(COSMOS_URI, COSMOS_CREDENTIAL, + connection_policy=connection_policy, + preferred_locations=PREFERRED_LOCATIONS, + excluded_locations=CLIENT_EXCLUDED_LOCATIONS, + enable_diagnostics_logging=True, + logger=client_logger, + user_agent=get_user_agent(client_id)) as client: + db = client.get_database_client(COSMOS_DATABASE) + cont = db.get_container_client(COSMOS_CONTAINER) + time.sleep(1) + + while True: + try: + if "write" in ops: + upsert_item(cont, REQUEST_EXCLUDED_LOCATIONS, CONCURRENT_REQUESTS, stats) + if "read" in ops: + read_item(cont, REQUEST_EXCLUDED_LOCATIONS, CONCURRENT_REQUESTS, stats) + if "query" in ops: + query_items(cont, REQUEST_EXCLUDED_LOCATIONS, CONCURRENT_QUERIES, stats) + except Exception as e: + client_logger.info("Exception in application layer") + client_logger.error(e) + finally: + if reporter: + reporter.stop() + + +if __name__ == "__main__": + file_name = os.path.basename(__file__) + prefix, logger = create_logger(file_name) + if WORKLOAD_USE_SYNC: + run_workload_sync(prefix, logger) + else: + asyncio.run(run_workload_async(prefix, logger)) diff --git a/sdk/cosmos/azure-cosmos/tests/workloads/workload_configs.py b/sdk/cosmos/azure-cosmos/tests/workloads/workload_configs.py index a1ca276fb10d..c0123677d9a1 100644 --- a/sdk/cosmos/azure-cosmos/tests/workloads/workload_configs.py +++ b/sdk/cosmos/azure-cosmos/tests/workloads/workload_configs.py @@ -25,3 +25,16 @@ PARTITION_KEY = os.environ.get("COSMOS_PARTITION_KEY", "id") NUMBER_OF_LOGICAL_PARTITIONS = int(os.environ.get("COSMOS_NUMBER_OF_LOGICAL_PARTITIONS", "10000")) THROUGHPUT = int(os.environ.get("COSMOS_THROUGHPUT", "1000000")) + +# Workload behavior +_VALID_OPERATIONS = {"read", "write", "query"} +WORKLOAD_OPERATIONS = frozenset( + op.strip().lower() + for op in os.environ.get("WORKLOAD_OPERATIONS", "read,write,query").split(",") + if op.strip() +) +_unknown_ops = WORKLOAD_OPERATIONS - _VALID_OPERATIONS +if _unknown_ops: + raise ValueError(f"Unknown WORKLOAD_OPERATIONS: {_unknown_ops}. Valid: {_VALID_OPERATIONS}") +WORKLOAD_USE_PROXY = os.environ.get("WORKLOAD_USE_PROXY", "false").lower() == "true" +WORKLOAD_USE_SYNC = os.environ.get("WORKLOAD_USE_SYNC", "false").lower() == "true" From 356ac4163801db275bcc5454b7d85730190b593d Mon Sep 17 00:00:00 2001 From: tvaron3 Date: Fri, 10 Apr 2026 09:17:33 -0700 Subject: [PATCH 05/15] refactor(workloads): delete old workload files replaced by unified workload.py Removed: r_workload.py, w_workload.py, r_proxy_workload.py, w_proxy_workload.py, r_w_q_workload.py, r_w_q_proxy_workload.py, r_w_q_sync_workload.py All replaced by workload.py with WORKLOAD_OPERATIONS and WORKLOAD_USE_PROXY env vars. Kept: r_w_q_with_incorrect_client_workload.py (special test case) Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../tests/workloads/r_proxy_workload.py | 52 ------------------ .../tests/workloads/r_w_q_proxy_workload.py | 54 ------------------- .../tests/workloads/r_w_q_sync_workload.py | 48 ----------------- .../tests/workloads/r_w_q_workload.py | 51 ------------------ .../tests/workloads/r_workload.py | 48 ----------------- .../tests/workloads/w_proxy_workload.py | 52 ------------------ .../tests/workloads/w_workload.py | 49 ----------------- 7 files changed, 354 deletions(-) delete mode 100644 sdk/cosmos/azure-cosmos/tests/workloads/r_proxy_workload.py delete mode 100644 sdk/cosmos/azure-cosmos/tests/workloads/r_w_q_proxy_workload.py delete mode 100644 sdk/cosmos/azure-cosmos/tests/workloads/r_w_q_sync_workload.py delete mode 100644 sdk/cosmos/azure-cosmos/tests/workloads/r_w_q_workload.py delete mode 100644 sdk/cosmos/azure-cosmos/tests/workloads/r_workload.py delete mode 100644 sdk/cosmos/azure-cosmos/tests/workloads/w_proxy_workload.py delete mode 100644 sdk/cosmos/azure-cosmos/tests/workloads/w_workload.py diff --git a/sdk/cosmos/azure-cosmos/tests/workloads/r_proxy_workload.py b/sdk/cosmos/azure-cosmos/tests/workloads/r_proxy_workload.py deleted file mode 100644 index 11d0b49191d9..000000000000 --- a/sdk/cosmos/azure-cosmos/tests/workloads/r_proxy_workload.py +++ /dev/null @@ -1,52 +0,0 @@ -# The MIT License (MIT) -# Copyright (c) Microsoft Corporation. All rights reserved. -from workload_utils import * -from workload_configs import * -from perf_config import get_perf_config -from perf_stats import Stats -from perf_reporter import PerfReporter - -from azure.cosmos.aio import CosmosClient as AsyncClient -from azure.core.pipeline.transport import AioHttpTransport -import asyncio - -async def run_workload(client_id, client_logger): - stats = Stats() - perf_config = get_perf_config() - reporter = None - if perf_config["enabled"] and perf_config["results_endpoint"]: - reporter = PerfReporter(stats, perf_config) - reporter.start() - - try: - session = create_custom_session() - async with AsyncClient(COSMOS_URI, - COSMOS_CREDENTIAL, - multiple_write_locations=USE_MULTIPLE_WRITABLE_LOCATIONS, - preferred_locations=PREFERRED_LOCATIONS, - transport=AioHttpTransport(session=session, session_owner=False), - excluded_locations=CLIENT_EXCLUDED_LOCATIONS, - enable_diagnostics_logging=True, - logger=client_logger, - user_agent=get_user_agent(client_id) - ) as client: - db = client.get_database_client(COSMOS_DATABASE) - cont = db.get_container_client(COSMOS_CONTAINER) - await asyncio.sleep(1) - - while True: - try: - await read_item_concurrently(cont, REQUEST_EXCLUDED_LOCATIONS, CONCURRENT_REQUESTS, stats) - await query_items_concurrently(cont, REQUEST_EXCLUDED_LOCATIONS, CONCURRENT_QUERIES, stats) - except Exception as e: - client_logger.info("Exception in application layer") - client_logger.error(e) - finally: - if reporter: - reporter.stop() - - -if __name__ == "__main__": - file_name = os.path.basename(__file__) - prefix, logger = create_logger(file_name) - asyncio.run(run_workload(prefix, logger)) diff --git a/sdk/cosmos/azure-cosmos/tests/workloads/r_w_q_proxy_workload.py b/sdk/cosmos/azure-cosmos/tests/workloads/r_w_q_proxy_workload.py deleted file mode 100644 index 7609928ba822..000000000000 --- a/sdk/cosmos/azure-cosmos/tests/workloads/r_w_q_proxy_workload.py +++ /dev/null @@ -1,54 +0,0 @@ -# The MIT License (MIT) -# Copyright (c) Microsoft Corporation. All rights reserved. -from azure.core.pipeline.transport._aiohttp import AioHttpTransport - -from workload_utils import * -from workload_configs import * -from perf_config import get_perf_config -from perf_stats import Stats -from perf_reporter import PerfReporter - -from azure.cosmos.aio import CosmosClient as AsyncClient -import asyncio - -async def run_workload(client_id, client_logger): - stats = Stats() - perf_config = get_perf_config() - reporter = None - if perf_config["enabled"] and perf_config["results_endpoint"]: - reporter = PerfReporter(stats, perf_config) - reporter.start() - - try: - session = create_custom_session() - async with AsyncClient(COSMOS_URI, - COSMOS_CREDENTIAL, - multiple_write_locations=USE_MULTIPLE_WRITABLE_LOCATIONS, - preferred_locations=PREFERRED_LOCATIONS, - transport=AioHttpTransport(session=session, session_owner=False), - excluded_locations=CLIENT_EXCLUDED_LOCATIONS, - enable_diagnostics_logging=True, - logger=client_logger, - user_agent=get_user_agent(client_id) - ) as client: - db = client.get_database_client(COSMOS_DATABASE) - cont = db.get_container_client(COSMOS_CONTAINER) - await asyncio.sleep(1) - - while True: - try: - await upsert_item_concurrently(cont, REQUEST_EXCLUDED_LOCATIONS, CONCURRENT_REQUESTS, stats) - await read_item_concurrently(cont, REQUEST_EXCLUDED_LOCATIONS, CONCURRENT_REQUESTS, stats) - await query_items_concurrently(cont, REQUEST_EXCLUDED_LOCATIONS, CONCURRENT_QUERIES, stats) - except Exception as e: - client_logger.info("Exception in application layer") - client_logger.error(e) - finally: - if reporter: - reporter.stop() - - -if __name__ == "__main__": - file_name = os.path.basename(__file__) - prefix, logger = create_logger(file_name) - asyncio.run(run_workload(prefix, logger)) diff --git a/sdk/cosmos/azure-cosmos/tests/workloads/r_w_q_sync_workload.py b/sdk/cosmos/azure-cosmos/tests/workloads/r_w_q_sync_workload.py deleted file mode 100644 index 30d051e92e17..000000000000 --- a/sdk/cosmos/azure-cosmos/tests/workloads/r_w_q_sync_workload.py +++ /dev/null @@ -1,48 +0,0 @@ -# The MIT License (MIT) -# Copyright (c) Microsoft Corporation. All rights reserved. -import time - -from workload_utils import * -from workload_configs import * -from perf_config import get_perf_config -from perf_stats import Stats -from perf_reporter import PerfReporter - -from azure.cosmos import CosmosClient, documents - -def run_workload(client_id, client_logger): - stats = Stats() - perf_config = get_perf_config() - reporter = None - if perf_config["enabled"] and perf_config["results_endpoint"]: - reporter = PerfReporter(stats, perf_config) - reporter.start() - - try: - connectionPolicy = documents.ConnectionPolicy() - connectionPolicy.UseMultipleWriteLocations = USE_MULTIPLE_WRITABLE_LOCATIONS - with CosmosClient(COSMOS_URI, COSMOS_CREDENTIAL, connection_policy=connectionPolicy, - preferred_locations=PREFERRED_LOCATIONS, excluded_locations=CLIENT_EXCLUDED_LOCATIONS, - enable_diagnostics_logging=True, logger=client_logger, - user_agent=get_user_agent(client_id)) as client: - db = client.get_database_client(COSMOS_DATABASE) - cont = db.get_container_client(COSMOS_CONTAINER) - time.sleep(1) - - while True: - try: - upsert_item(cont, REQUEST_EXCLUDED_LOCATIONS, CONCURRENT_REQUESTS, stats) - read_item(cont, REQUEST_EXCLUDED_LOCATIONS, CONCURRENT_REQUESTS, stats) - query_items(cont, REQUEST_EXCLUDED_LOCATIONS, CONCURRENT_QUERIES, stats) - except Exception as e: - client_logger.info("Exception in application layer") - client_logger.error(e) - finally: - if reporter: - reporter.stop() - - -if __name__ == "__main__": - file_name = os.path.basename(__file__) - prefix, logger = create_logger(file_name) - run_workload(prefix, logger) diff --git a/sdk/cosmos/azure-cosmos/tests/workloads/r_w_q_workload.py b/sdk/cosmos/azure-cosmos/tests/workloads/r_w_q_workload.py deleted file mode 100644 index d25d934676db..000000000000 --- a/sdk/cosmos/azure-cosmos/tests/workloads/r_w_q_workload.py +++ /dev/null @@ -1,51 +0,0 @@ -# The MIT License (MIT) -# Copyright (c) Microsoft Corporation. All rights reserved. -import sys - -from workload_utils import * -from workload_configs import * -from perf_config import get_perf_config -from perf_stats import Stats -from perf_reporter import PerfReporter -sys.path.append(r"/") - -from azure.cosmos.aio import CosmosClient as AsyncClient -import asyncio - - -async def run_workload(client_id, client_logger): - stats = Stats() - perf_config = get_perf_config() - reporter = None - if perf_config["enabled"] and perf_config["results_endpoint"]: - reporter = PerfReporter(stats, perf_config) - reporter.start() - - try: - async with AsyncClient(COSMOS_URI, COSMOS_CREDENTIAL, - preferred_locations=PREFERRED_LOCATIONS, - excluded_locations=CLIENT_EXCLUDED_LOCATIONS, - enable_diagnostics_logging=True, - logger=client_logger, - user_agent=get_user_agent(client_id)) as client: - db = client.get_database_client(COSMOS_DATABASE) - cont = db.get_container_client(COSMOS_CONTAINER) - await asyncio.sleep(1) - - while True: - try: - await upsert_item_concurrently(cont, REQUEST_EXCLUDED_LOCATIONS, CONCURRENT_REQUESTS, stats) - await read_item_concurrently(cont, REQUEST_EXCLUDED_LOCATIONS, CONCURRENT_REQUESTS, stats) - await query_items_concurrently(cont, REQUEST_EXCLUDED_LOCATIONS, CONCURRENT_QUERIES, stats) - except Exception as e: - client_logger.info("Exception in application layer") - client_logger.error(e) - finally: - if reporter: - reporter.stop() - - -if __name__ == "__main__": - file_name = os.path.basename(__file__) - prefix, logger = create_logger(file_name) - asyncio.run(run_workload(prefix, logger)) diff --git a/sdk/cosmos/azure-cosmos/tests/workloads/r_workload.py b/sdk/cosmos/azure-cosmos/tests/workloads/r_workload.py deleted file mode 100644 index cf7e78692f6a..000000000000 --- a/sdk/cosmos/azure-cosmos/tests/workloads/r_workload.py +++ /dev/null @@ -1,48 +0,0 @@ -# The MIT License (MIT) -# Copyright (c) Microsoft Corporation. All rights reserved. - -from azure.cosmos import documents -from workload_utils import * -from workload_configs import * -from perf_config import get_perf_config -from perf_stats import Stats -from perf_reporter import PerfReporter - -from azure.cosmos.aio import CosmosClient as AsyncClient -import asyncio - -async def run_workload(client_id, client_logger): - stats = Stats() - perf_config = get_perf_config() - reporter = None - if perf_config["enabled"] and perf_config["results_endpoint"]: - reporter = PerfReporter(stats, perf_config) - reporter.start() - - try: - connectionPolicy = documents.ConnectionPolicy() - connectionPolicy.UseMultipleWriteLocations = USE_MULTIPLE_WRITABLE_LOCATIONS - async with AsyncClient(COSMOS_URI, COSMOS_CREDENTIAL, connection_policy=connectionPolicy, - preferred_locations=PREFERRED_LOCATIONS, excluded_locations=CLIENT_EXCLUDED_LOCATIONS, - enable_diagnostics_logging=True, logger=client_logger, - user_agent=get_user_agent(client_id)) as client: - db = client.get_database_client(COSMOS_DATABASE) - cont = db.get_container_client(COSMOS_CONTAINER) - await asyncio.sleep(1) - - while True: - try: - await read_item_concurrently(cont, REQUEST_EXCLUDED_LOCATIONS, CONCURRENT_REQUESTS, stats) - await query_items_concurrently(cont, REQUEST_EXCLUDED_LOCATIONS, CONCURRENT_QUERIES, stats) - except Exception as e: - client_logger.info("Exception in application layer") - client_logger.error(e) - finally: - if reporter: - reporter.stop() - - -if __name__ == "__main__": - file_name = os.path.basename(__file__) - prefix, logger = create_logger(file_name) - asyncio.run(run_workload(prefix, logger)) diff --git a/sdk/cosmos/azure-cosmos/tests/workloads/w_proxy_workload.py b/sdk/cosmos/azure-cosmos/tests/workloads/w_proxy_workload.py deleted file mode 100644 index ced330bb3545..000000000000 --- a/sdk/cosmos/azure-cosmos/tests/workloads/w_proxy_workload.py +++ /dev/null @@ -1,52 +0,0 @@ -# The MIT License (MIT) -# Copyright (c) Microsoft Corporation. All rights reserved. - -from workload_utils import * -from workload_configs import * -from perf_config import get_perf_config -from perf_stats import Stats -from perf_reporter import PerfReporter - -from azure.cosmos.aio import CosmosClient as AsyncClient -from azure.core.pipeline.transport import AioHttpTransport -import asyncio - -async def run_workload(client_id, client_logger): - stats = Stats() - perf_config = get_perf_config() - reporter = None - if perf_config["enabled"] and perf_config["results_endpoint"]: - reporter = PerfReporter(stats, perf_config) - reporter.start() - - try: - session = create_custom_session() - async with AsyncClient(COSMOS_URI, - COSMOS_CREDENTIAL, - multiple_write_locations=USE_MULTIPLE_WRITABLE_LOCATIONS, - preferred_locations=PREFERRED_LOCATIONS, - transport=AioHttpTransport(session=session, session_owner=False), - excluded_locations=CLIENT_EXCLUDED_LOCATIONS, - enable_diagnostics_logging=True, - logger=client_logger, - user_agent=get_user_agent(client_id) - ) as client: - db = client.get_database_client(COSMOS_DATABASE) - cont = db.get_container_client(COSMOS_CONTAINER) - await asyncio.sleep(1) - - while True: - try: - await upsert_item_concurrently(cont, REQUEST_EXCLUDED_LOCATIONS, CONCURRENT_REQUESTS, stats) - except Exception as e: - client_logger.info("Exception in application layer") - client_logger.error(e) - finally: - if reporter: - reporter.stop() - - -if __name__ == "__main__": - file_name = os.path.basename(__file__) - prefix, logger = create_logger(file_name) - asyncio.run(run_workload(prefix, logger)) diff --git a/sdk/cosmos/azure-cosmos/tests/workloads/w_workload.py b/sdk/cosmos/azure-cosmos/tests/workloads/w_workload.py deleted file mode 100644 index 5b77cc0023ee..000000000000 --- a/sdk/cosmos/azure-cosmos/tests/workloads/w_workload.py +++ /dev/null @@ -1,49 +0,0 @@ -# The MIT License (MIT) -# Copyright (c) Microsoft Corporation. All rights reserved. -import sys - -from azure.cosmos import documents -from workload_utils import * -from workload_configs import * -from perf_config import get_perf_config -from perf_stats import Stats -from perf_reporter import PerfReporter -sys.path.append(r"/") - -from azure.cosmos.aio import CosmosClient as AsyncClient -import asyncio - -async def run_workload(client_id, client_logger): - stats = Stats() - perf_config = get_perf_config() - reporter = None - if perf_config["enabled"] and perf_config["results_endpoint"]: - reporter = PerfReporter(stats, perf_config) - reporter.start() - - try: - connectionPolicy = documents.ConnectionPolicy() - connectionPolicy.UseMultipleWriteLocations = USE_MULTIPLE_WRITABLE_LOCATIONS - async with AsyncClient(COSMOS_URI, COSMOS_CREDENTIAL, connection_policy=connectionPolicy, - preferred_locations=PREFERRED_LOCATIONS, excluded_locations=CLIENT_EXCLUDED_LOCATIONS, - enable_diagnostics_logging=True, logger=client_logger, - user_agent=get_user_agent(client_id)) as client: - db = client.get_database_client(COSMOS_DATABASE) - cont = db.get_container_client(COSMOS_CONTAINER) - await asyncio.sleep(1) - - while True: - try: - await upsert_item_concurrently(cont, REQUEST_EXCLUDED_LOCATIONS, CONCURRENT_REQUESTS, stats) - except Exception as e: - client_logger.info("Exception in application layer") - client_logger.error(e) - finally: - if reporter: - reporter.stop() - - -if __name__ == "__main__": - file_name = os.path.basename(__file__) - prefix, logger = create_logger(file_name) - asyncio.run(run_workload(prefix, logger)) From 45122c2d4f1eabfe8a8eddb09122d4f8b0af55f6 Mon Sep 17 00:00:00 2001 From: tvaron3 Date: Fri, 10 Apr 2026 09:34:15 -0700 Subject: [PATCH 06/15] feat(workloads): add WORKLOAD_SKIP_CLOSE to simulate unclosed clients Replaces r_w_q_with_incorrect_client_workload.py with an env var: WORKLOAD_SKIP_CLOSE=true creates the client without a context manager, simulating applications that don't properly close the Cosmos client. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../r_w_q_with_incorrect_client_workload.py | 49 ------------------- .../azure-cosmos/tests/workloads/workload.py | 12 ++++- .../tests/workloads/workload_configs.py | 4 ++ 3 files changed, 15 insertions(+), 50 deletions(-) delete mode 100644 sdk/cosmos/azure-cosmos/tests/workloads/r_w_q_with_incorrect_client_workload.py diff --git a/sdk/cosmos/azure-cosmos/tests/workloads/r_w_q_with_incorrect_client_workload.py b/sdk/cosmos/azure-cosmos/tests/workloads/r_w_q_with_incorrect_client_workload.py deleted file mode 100644 index 428ea2247dc0..000000000000 --- a/sdk/cosmos/azure-cosmos/tests/workloads/r_w_q_with_incorrect_client_workload.py +++ /dev/null @@ -1,49 +0,0 @@ -# The MIT License (MIT) -# Copyright (c) Microsoft Corporation. All rights reserved. - -from azure.cosmos import documents -from workload_utils import * -from workload_configs import * -from perf_config import get_perf_config -from perf_stats import Stats -from perf_reporter import PerfReporter - -from azure.cosmos.aio import CosmosClient as AsyncClient -import asyncio - -async def run_workload(client_id, client_logger): - stats = Stats() - perf_config = get_perf_config() - reporter = None - if perf_config["enabled"] and perf_config["results_endpoint"]: - reporter = PerfReporter(stats, perf_config) - reporter.start() - - try: - connectionPolicy = documents.ConnectionPolicy() - connectionPolicy.UseMultipleWriteLocations = USE_MULTIPLE_WRITABLE_LOCATIONS - client = AsyncClient(COSMOS_URI, COSMOS_CREDENTIAL, connection_policy=connectionPolicy, - preferred_locations=PREFERRED_LOCATIONS, excluded_locations=CLIENT_EXCLUDED_LOCATIONS, - enable_diagnostics_logging=True, logger=client_logger, - user_agent=get_user_agent(client_id)) - db = client.get_database_client(COSMOS_DATABASE) - cont = db.get_container_client(COSMOS_CONTAINER) - await asyncio.sleep(1) - - while True: - try: - await upsert_item_concurrently(cont, REQUEST_EXCLUDED_LOCATIONS, CONCURRENT_REQUESTS, stats) - await read_item_concurrently(cont, REQUEST_EXCLUDED_LOCATIONS, CONCURRENT_REQUESTS, stats) - await query_items_concurrently(cont, REQUEST_EXCLUDED_LOCATIONS, CONCURRENT_QUERIES, stats) - except Exception as e: - client_logger.info("Exception in application layer") - client_logger.error(e) - finally: - if reporter: - reporter.stop() - - -if __name__ == "__main__": - file_name = os.path.basename(__file__) - prefix, logger = create_logger(file_name) - asyncio.run(run_workload(prefix, logger)) diff --git a/sdk/cosmos/azure-cosmos/tests/workloads/workload.py b/sdk/cosmos/azure-cosmos/tests/workloads/workload.py index 731cb77a38b8..115afc8397a5 100644 --- a/sdk/cosmos/azure-cosmos/tests/workloads/workload.py +++ b/sdk/cosmos/azure-cosmos/tests/workloads/workload.py @@ -54,7 +54,14 @@ async def run_workload_async(client_id, client_logger): if USE_MULTIPLE_WRITABLE_LOCATIONS: client_kwargs["multiple_write_locations"] = True - async with AsyncClient(COSMOS_URI, COSMOS_CREDENTIAL, **client_kwargs) as client: + if WORKLOAD_SKIP_CLOSE: + # Simulate applications that don't properly close the client + client = AsyncClient(COSMOS_URI, COSMOS_CREDENTIAL, **client_kwargs) + else: + client = AsyncClient(COSMOS_URI, COSMOS_CREDENTIAL, **client_kwargs) + await client.__aenter__() + + try: db = client.get_database_client(COSMOS_DATABASE) cont = db.get_container_client(COSMOS_CONTAINER) await asyncio.sleep(1) @@ -70,6 +77,9 @@ async def run_workload_async(client_id, client_logger): except Exception as e: client_logger.info("Exception in application layer") client_logger.error(e) + finally: + if not WORKLOAD_SKIP_CLOSE: + await client.__aexit__(None, None, None) finally: if reporter: reporter.stop() diff --git a/sdk/cosmos/azure-cosmos/tests/workloads/workload_configs.py b/sdk/cosmos/azure-cosmos/tests/workloads/workload_configs.py index c0123677d9a1..f1e81c09e12f 100644 --- a/sdk/cosmos/azure-cosmos/tests/workloads/workload_configs.py +++ b/sdk/cosmos/azure-cosmos/tests/workloads/workload_configs.py @@ -38,3 +38,7 @@ raise ValueError(f"Unknown WORKLOAD_OPERATIONS: {_unknown_ops}. Valid: {_VALID_OPERATIONS}") WORKLOAD_USE_PROXY = os.environ.get("WORKLOAD_USE_PROXY", "false").lower() == "true" WORKLOAD_USE_SYNC = os.environ.get("WORKLOAD_USE_SYNC", "false").lower() == "true" + +# When true, the client is created without a context manager (no automatic close). +# Simulates applications that don't properly close the Cosmos client. +WORKLOAD_SKIP_CLOSE = os.environ.get("WORKLOAD_SKIP_CLOSE", "false").lower() == "true" From 14d7797934e42003be5e521639b1b9822d69f58a Mon Sep 17 00:00:00 2001 From: tvaron3 Date: Fri, 10 Apr 2026 09:39:13 -0700 Subject: [PATCH 07/15] perf(workloads): use perf_counter_ns for higher precision timing Switch from time.perf_counter() * 1000 to time.perf_counter_ns() / 1_000_000 for nanosecond precision without floating-point multiplication artifacts. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../tests/workloads/workload_utils.py | 24 +++++++++---------- 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/sdk/cosmos/azure-cosmos/tests/workloads/workload_utils.py b/sdk/cosmos/azure-cosmos/tests/workloads/workload_utils.py index 2697ae69d808..a2e65edbda7d 100644 --- a/sdk/cosmos/azure-cosmos/tests/workloads/workload_utils.py +++ b/sdk/cosmos/azure-cosmos/tests/workloads/workload_utils.py @@ -69,7 +69,7 @@ def _record_error(stats, operation, error): def upsert_item(container, excluded_locations, num_upserts, stats=None): item = _get_upsert_item() for _ in range(num_upserts): - start = time.perf_counter() + start = time.perf_counter_ns() try: if excluded_locations: container.upsert_item(item, etag=None, match_condition=None, @@ -77,7 +77,7 @@ def upsert_item(container, excluded_locations, num_upserts, stats=None): else: container.upsert_item(item, etag=None, match_condition=None) if stats: - stats.record("UpsertItem", (time.perf_counter() - start) * 1000) + stats.record("UpsertItem", (time.perf_counter_ns() - start) / 1_000_000) except Exception as e: if stats: _record_error(stats, "UpsertItem", e) @@ -87,7 +87,7 @@ def upsert_item(container, excluded_locations, num_upserts, stats=None): def read_item(container, excluded_locations, num_reads, stats=None): for _ in range(num_reads): item = get_existing_random_item() - start = time.perf_counter() + start = time.perf_counter_ns() try: if excluded_locations: container.read_item(item["id"], item[PARTITION_KEY], etag=None, match_condition=None, @@ -95,7 +95,7 @@ def read_item(container, excluded_locations, num_reads, stats=None): else: container.read_item(item["id"], item[PARTITION_KEY], etag=None, match_condition=None) if stats: - stats.record("ReadItem", (time.perf_counter() - start) * 1000) + stats.record("ReadItem", (time.perf_counter_ns() - start) / 1_000_000) except Exception as e: if stats: _record_error(stats, "ReadItem", e) @@ -108,7 +108,7 @@ def query_items(container, excluded_locations, num_queries, stats=None): def perform_query(container, excluded_locations, stats=None): random_item = get_existing_random_item() - start = time.perf_counter() + start = time.perf_counter_ns() try: if excluded_locations: results = container.query_items(query="SELECT * FROM c where c.id=@id and c.pk=@pk", @@ -123,7 +123,7 @@ def perform_query(container, excluded_locations, stats=None): partition_key=random_item[PARTITION_KEY]) items = [item for item in results] if stats: - stats.record("QueryItems", (time.perf_counter() - start) * 1000) + stats.record("QueryItems", (time.perf_counter_ns() - start) / 1_000_000) except Exception as e: if stats: _record_error(stats, "QueryItems", e) @@ -132,7 +132,7 @@ def perform_query(container, excluded_locations, stats=None): async def _timed_upsert_async(container, item, excluded_locations, stats): """Single async upsert with timing and error tracking.""" - start = time.perf_counter() + start = time.perf_counter_ns() try: if excluded_locations: await container.upsert_item(item, etag=None, match_condition=None, @@ -140,7 +140,7 @@ async def _timed_upsert_async(container, item, excluded_locations, stats): else: await container.upsert_item(item, etag=None, match_condition=None) if stats: - stats.record("UpsertItem", (time.perf_counter() - start) * 1000) + stats.record("UpsertItem", (time.perf_counter_ns() - start) / 1_000_000) except Exception as e: if stats: _record_error(stats, "UpsertItem", e) @@ -149,7 +149,7 @@ async def _timed_upsert_async(container, item, excluded_locations, stats): async def _timed_read_async(container, item, excluded_locations, stats): """Single async read with timing and error tracking.""" - start = time.perf_counter() + start = time.perf_counter_ns() try: if excluded_locations: await container.read_item(item["id"], item[PARTITION_KEY], etag=None, match_condition=None, @@ -157,7 +157,7 @@ async def _timed_read_async(container, item, excluded_locations, stats): else: await container.read_item(item["id"], item[PARTITION_KEY], etag=None, match_condition=None) if stats: - stats.record("ReadItem", (time.perf_counter() - start) * 1000) + stats.record("ReadItem", (time.perf_counter_ns() - start) / 1_000_000) except Exception as e: if stats: _record_error(stats, "ReadItem", e) @@ -166,7 +166,7 @@ async def _timed_read_async(container, item, excluded_locations, stats): async def _timed_query_async(container, random_item, excluded_locations, stats): """Single async query with timing and error tracking.""" - start = time.perf_counter() + start = time.perf_counter_ns() try: if excluded_locations: results = container.query_items(query="SELECT * FROM c where c.id=@id and c.pk=@pk", @@ -181,7 +181,7 @@ async def _timed_query_async(container, random_item, excluded_locations, stats): partition_key=random_item[PARTITION_KEY]) items = [item async for item in results] if stats: - stats.record("QueryItems", (time.perf_counter() - start) * 1000) + stats.record("QueryItems", (time.perf_counter_ns() - start) / 1_000_000) except Exception as e: if stats: _record_error(stats, "QueryItems", e) From 5021b4722f9aeec414401946698e0487cdbf3d09 Mon Sep 17 00:00:00 2001 From: tvaron3 Date: Fri, 10 Apr 2026 09:44:55 -0700 Subject: [PATCH 08/15] refactor(workloads): remove run_workloads.sh and dev.md Infra/orchestration scripts belong in the cosmos-sdk-copilot-toolkit repo, not in the SDK repo. Workload code (workload.py, perf_*, workload_utils.py) stays here. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../azure-cosmos/tests/workloads/dev.md | 45 ------------------- .../tests/workloads/run_workloads.sh | 29 ------------ 2 files changed, 74 deletions(-) delete mode 100644 sdk/cosmos/azure-cosmos/tests/workloads/dev.md delete mode 100755 sdk/cosmos/azure-cosmos/tests/workloads/run_workloads.sh diff --git a/sdk/cosmos/azure-cosmos/tests/workloads/dev.md b/sdk/cosmos/azure-cosmos/tests/workloads/dev.md deleted file mode 100644 index 0d12d2375223..000000000000 --- a/sdk/cosmos/azure-cosmos/tests/workloads/dev.md +++ /dev/null @@ -1,45 +0,0 @@ -## SDK Scale Testing -This directory contains the scale testing workloads for the SDK. The workloads are designed to test the performance -and scalability of the SDK under various conditions. There are different types of workloads and each will create a log -file when run. These logs are named in this format `--.log`. - -### Setup Scale Testing -1. Create a VM in Azure with the following configuration: - - 8 vCPUs - - 32 GB RAM - - Ubuntu - - Accelerated networking -1. Give the VM necessary [permissions](https://learn.microsoft.com/azure/cosmos-db/nosql/how-to-grant-data-plane-access?tabs=built-in-definition%2Ccsharp&pivots=azure-interface-cli) to access the Cosmos DB account if using AAD (Optional). -1. Create an Azure App Insights Resource (Optional) -1. Fork and clone this repository -1. Go to azure cosmos folder - - `cd azure-sdk-for-python/sdk/cosmos/azure-cosmos` -1. Install the required packages and create virtual environment - - `setup_env.sh` - - `source azure-cosmosdb-sdk-environment/bin/activate` -1. Checkout the branch with the changes to test. -1. Install azure-cosmos - - `pip install .` -1. Go to workloads folder - - `cd tests/workloads` -1. Fill out relevant configs in `workload_configs.py`: key, host, etc -1. Go to envoy folder and generate envoy configuration file using template. Template files are in `envoy/templates` directory. `` is your Cosmos DB account name. - - `cd envoy` - - `./generate_envoy_config.sh ` -1. Start envoy using the generated configuration file - - `mkdir logs` - - `envoy -c .yaml --log-level debug --log-path logs/debug.txt` -1. Run the setup workload to create the database and containers and insert data - - `python3 initial-setup.py` -1. Run the scale workloads - - `./run_workloads.sh ` - -### Monitor Run -- `ps -eaf | grep "python3"` to see the running processes -- `tail -f ` to see the logs in real time - -### Close Workloads -- If you want to keep the logs and stop the scripts, - `./shutdown_workloads.sh --do-not-remove-logs` -- If you want to remove the logs and stop the scripts, - `./shutdown_workloads.sh` diff --git a/sdk/cosmos/azure-cosmos/tests/workloads/run_workloads.sh b/sdk/cosmos/azure-cosmos/tests/workloads/run_workloads.sh deleted file mode 100755 index 836507c4a25d..000000000000 --- a/sdk/cosmos/azure-cosmos/tests/workloads/run_workloads.sh +++ /dev/null @@ -1,29 +0,0 @@ -#!/bin/bash -if [ $# -eq 0 ]; then - echo "Usage: $0 num_runs [operations] [proxy]" - echo " num_runs: number of processes per config" - echo " operations: comma-separated (default: read,write,query)" - echo " proxy: true/false (default: false)" - exit 1 -fi - -num_runs=$1 -operations=${2:-read,write,query} -use_proxy=${3:-false} - -echo "[Info] Installing azure-cosmos package..." -pip install ../../. -if [ $? -ne 0 ]; then - echo "[Error] Failed to install azure-cosmos. Exiting." - exit 2 -fi -echo "[Info] azure-cosmos installed successfully." - -echo "[Info] Starting $num_runs processes: operations=$operations proxy=$use_proxy" - -for (( i=0; i /dev/null 2>&1 & - sleep 1 -done - -echo "[Info] All workloads started." From 024bc442cd0b5ebc0812ee954a55c9a4bcebc1d7 Mon Sep 17 00:00:00 2001 From: tvaron3 Date: Fri, 10 Apr 2026 09:56:22 -0700 Subject: [PATCH 09/15] =?UTF-8?q?fix(workloads):=20correct=20hdrhistogram?= =?UTF-8?q?=20import=20=E2=80=94=20module=20is=20hdrh=20not=20hdrhistogram?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The pip package is 'hdrhistogram' but the Python module is 'hdrh'. Import changed from 'from hdrhistogram import HdrHistogram' to 'from hdrh.histogram import HdrHistogram'. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- sdk/cosmos/azure-cosmos/tests/workloads/perf_stats.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdk/cosmos/azure-cosmos/tests/workloads/perf_stats.py b/sdk/cosmos/azure-cosmos/tests/workloads/perf_stats.py index 6f8d05ae38fd..8da6ef205b13 100644 --- a/sdk/cosmos/azure-cosmos/tests/workloads/perf_stats.py +++ b/sdk/cosmos/azure-cosmos/tests/workloads/perf_stats.py @@ -6,11 +6,11 @@ import time try: - from hdrhistogram import HdrHistogram + from hdrh.histogram import HdrHistogram except ImportError: raise ImportError( "hdrhistogram is required for perf_stats. " - "Install it with: pip install hdrhistogram" + "Install it with: pip install hdrhistogram (module name: hdrh)" ) From 63ae1a818f534a5c98b1a0e46704df86661f30b3 Mon Sep 17 00:00:00 2001 From: tvaron3 Date: Fri, 10 Apr 2026 09:57:04 -0700 Subject: [PATCH 10/15] fix(workloads): use get_mean_value() for hdrh API Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- sdk/cosmos/azure-cosmos/tests/workloads/perf_stats.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/cosmos/azure-cosmos/tests/workloads/perf_stats.py b/sdk/cosmos/azure-cosmos/tests/workloads/perf_stats.py index 8da6ef205b13..1bf736061bb8 100644 --- a/sdk/cosmos/azure-cosmos/tests/workloads/perf_stats.py +++ b/sdk/cosmos/azure-cosmos/tests/workloads/perf_stats.py @@ -79,7 +79,7 @@ def drain_all(self) -> tuple[list[dict], list[dict]]: "errors": errors, "min_ms": hist.min_value / 1000.0, "max_ms": hist.max_value / 1000.0, - "mean_ms": hist.mean_value / 1000.0, + "mean_ms": hist.get_mean_value() / 1000.0, "p50_ms": hist.get_value_at_percentile(50.0) / 1000.0, "p90_ms": hist.get_value_at_percentile(90.0) / 1000.0, "p99_ms": hist.get_value_at_percentile(99.0) / 1000.0, From 52a39567d1237ecc7013fd5ec193b7c157758676 Mon Sep 17 00:00:00 2001 From: tvaron3 Date: Fri, 10 Apr 2026 14:22:09 -0700 Subject: [PATCH 11/15] style(workloads): fix black formatting and setup_env.sh references Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../tests/workloads/perf_config.py | 4 +- .../tests/workloads/perf_reporter.py | 21 ++- .../tests/workloads/perf_stats.py | 80 +++++---- .../azure-cosmos/tests/workloads/setup_env.sh | 4 +- .../azure-cosmos/tests/workloads/workload.py | 41 +++-- .../tests/workloads/workload_configs.py | 34 +++- .../tests/workloads/workload_utils.py | 165 ++++++++++++------ 7 files changed, 239 insertions(+), 110 deletions(-) diff --git a/sdk/cosmos/azure-cosmos/tests/workloads/perf_config.py b/sdk/cosmos/azure-cosmos/tests/workloads/perf_config.py index 1c60a7b779d8..28959f74c863 100644 --- a/sdk/cosmos/azure-cosmos/tests/workloads/perf_config.py +++ b/sdk/cosmos/azure-cosmos/tests/workloads/perf_config.py @@ -12,7 +12,9 @@ def _get_git_sha() -> str: try: result = subprocess.run( ["git", "rev-parse", "--short", "HEAD"], - capture_output=True, text=True, timeout=5 + capture_output=True, + text=True, + timeout=5, ) if result.returncode == 0: return result.stdout.strip() diff --git a/sdk/cosmos/azure-cosmos/tests/workloads/perf_reporter.py b/sdk/cosmos/azure-cosmos/tests/workloads/perf_reporter.py index 6e1b2606a3dd..cf89c4079c3d 100644 --- a/sdk/cosmos/azure-cosmos/tests/workloads/perf_reporter.py +++ b/sdk/cosmos/azure-cosmos/tests/workloads/perf_reporter.py @@ -20,6 +20,7 @@ def _get_sdk_version() -> str: """Get the azure-cosmos SDK version string.""" try: from azure.cosmos import __version__ + return __version__ except Exception: return "unknown" @@ -80,10 +81,15 @@ def __init__(self, stats: Stats, config: dict): def start(self): """Start the background reporting thread (daemon).""" - self._thread = threading.Thread(target=self._run, daemon=True, name="perf-reporter") + self._thread = threading.Thread( + target=self._run, daemon=True, name="perf-reporter" + ) self._thread.start() - logger.info("PerfReporter started (interval=%ds, workload_id=%s)", - self._config["report_interval"], self._config["workload_id"]) + logger.info( + "PerfReporter started (interval=%ds, workload_id=%s)", + self._config["report_interval"], + self._config["workload_id"], + ) def stop(self): """Stop the reporter and flush final results.""" @@ -152,7 +158,10 @@ def _flush(self): concurrency = _safe_int_env("COSMOS_CONCURRENT_REQUESTS", 100) preferred = os.environ.get("COSMOS_PREFERRED_LOCATIONS", "") excluded = os.environ.get("COSMOS_CLIENT_EXCLUDED_LOCATIONS", "") - ppcb = os.environ.get("AZURE_COSMOS_ENABLE_CIRCUIT_BREAKER", "false").lower() == "true" + ppcb = ( + os.environ.get("AZURE_COSMOS_ENABLE_CIRCUIT_BREAKER", "false").lower() + == "true" + ) summaries, errors = self._stats.drain_all() for s in summaries: @@ -187,7 +196,9 @@ def _flush(self): try: self._container.upsert_item(doc) except Exception as e: - logger.warning("PerfReporter upsert failed for %s: %s", s["operation"], e) + logger.warning( + "PerfReporter upsert failed for %s: %s", s["operation"], e + ) for err in errors: doc = { diff --git a/sdk/cosmos/azure-cosmos/tests/workloads/perf_stats.py b/sdk/cosmos/azure-cosmos/tests/workloads/perf_stats.py index 1bf736061bb8..6e7d61644837 100644 --- a/sdk/cosmos/azure-cosmos/tests/workloads/perf_stats.py +++ b/sdk/cosmos/azure-cosmos/tests/workloads/perf_stats.py @@ -38,22 +38,30 @@ def record(self, operation: str, duration_ms: float): # Record in microseconds for sub-ms precision self._histograms[operation].record_value(max(1, int(duration_ms * 1000))) - def record_error(self, operation: str, error_msg: str, traceback_str: str, - status_code: int = None, sub_status_code: int = None): + def record_error( + self, + operation: str, + error_msg: str, + traceback_str: str, + status_code: int = None, + sub_status_code: int = None, + ): """Record a failed operation with error details.""" with self._lock: if operation not in self._error_counts: self._error_counts[operation] = 0 self._histograms[operation] = HdrHistogram(1, 60_000_000, 3) self._error_counts[operation] += 1 - self._errors.append({ - "operation": operation, - "error_message": error_msg, - "source_message": traceback_str, - "error_status_code": status_code, - "error_sub_status_code": sub_status_code, - "timestamp": time.time(), - }) + self._errors.append( + { + "operation": operation, + "error_message": error_msg, + "source_message": traceback_str, + "error_status_code": status_code, + "error_sub_status_code": sub_status_code, + "timestamp": time.time(), + } + ) def drain_all(self) -> tuple[list[dict], list[dict]]: """Atomically drain both summaries and error details under one lock. @@ -65,7 +73,9 @@ def drain_all(self) -> tuple[list[dict], list[dict]]: """ with self._lock: summaries = [] - all_ops = set(list(self._histograms.keys()) + list(self._error_counts.keys())) + all_ops = set( + list(self._histograms.keys()) + list(self._error_counts.keys()) + ) for op in sorted(all_ops): hist = self._histograms.get(op) errors = self._error_counts.get(op, 0) @@ -73,29 +83,33 @@ def drain_all(self) -> tuple[list[dict], list[dict]]: if count == 0 and errors == 0: continue if count > 0: - summaries.append({ - "operation": op, - "count": count, - "errors": errors, - "min_ms": hist.min_value / 1000.0, - "max_ms": hist.max_value / 1000.0, - "mean_ms": hist.get_mean_value() / 1000.0, - "p50_ms": hist.get_value_at_percentile(50.0) / 1000.0, - "p90_ms": hist.get_value_at_percentile(90.0) / 1000.0, - "p99_ms": hist.get_value_at_percentile(99.0) / 1000.0, - }) + summaries.append( + { + "operation": op, + "count": count, + "errors": errors, + "min_ms": hist.min_value / 1000.0, + "max_ms": hist.max_value / 1000.0, + "mean_ms": hist.get_mean_value() / 1000.0, + "p50_ms": hist.get_value_at_percentile(50.0) / 1000.0, + "p90_ms": hist.get_value_at_percentile(90.0) / 1000.0, + "p99_ms": hist.get_value_at_percentile(99.0) / 1000.0, + } + ) else: - summaries.append({ - "operation": op, - "count": 0, - "errors": errors, - "min_ms": 0.0, - "max_ms": 0.0, - "mean_ms": 0.0, - "p50_ms": 0.0, - "p90_ms": 0.0, - "p99_ms": 0.0, - }) + summaries.append( + { + "operation": op, + "count": 0, + "errors": errors, + "min_ms": 0.0, + "max_ms": 0.0, + "mean_ms": 0.0, + "p50_ms": 0.0, + "p90_ms": 0.0, + "p99_ms": 0.0, + } + ) # Reset for next interval self._histograms.clear() self._error_counts.clear() diff --git a/sdk/cosmos/azure-cosmos/tests/workloads/setup_env.sh b/sdk/cosmos/azure-cosmos/tests/workloads/setup_env.sh index 8337db1cfc41..b90c08529be3 100755 --- a/sdk/cosmos/azure-cosmos/tests/workloads/setup_env.sh +++ b/sdk/cosmos/azure-cosmos/tests/workloads/setup_env.sh @@ -70,8 +70,8 @@ Manual steps remaining: cd ../tests/workloads python3 initial-setup.py 5. Run scale workloads: - ./run_workloads.sh + WORKLOAD_OPERATIONS=read,write,query python3 workload.py -Refer to dev.md for more details. +See cosmos-sdk-copilot-toolkit repo for deployment docs. EOF echo "[Step 6] Manual steps: completed." diff --git a/sdk/cosmos/azure-cosmos/tests/workloads/workload.py b/sdk/cosmos/azure-cosmos/tests/workloads/workload.py index 115afc8397a5..a08721fb3123 100644 --- a/sdk/cosmos/azure-cosmos/tests/workloads/workload.py +++ b/sdk/cosmos/azure-cosmos/tests/workloads/workload.py @@ -69,11 +69,17 @@ async def run_workload_async(client_id, client_logger): while True: try: if "write" in ops: - await upsert_item_concurrently(cont, REQUEST_EXCLUDED_LOCATIONS, CONCURRENT_REQUESTS, stats) + await upsert_item_concurrently( + cont, REQUEST_EXCLUDED_LOCATIONS, CONCURRENT_REQUESTS, stats + ) if "read" in ops: - await read_item_concurrently(cont, REQUEST_EXCLUDED_LOCATIONS, CONCURRENT_REQUESTS, stats) + await read_item_concurrently( + cont, REQUEST_EXCLUDED_LOCATIONS, CONCURRENT_REQUESTS, stats + ) if "query" in ops: - await query_items_concurrently(cont, REQUEST_EXCLUDED_LOCATIONS, CONCURRENT_QUERIES, stats) + await query_items_concurrently( + cont, REQUEST_EXCLUDED_LOCATIONS, CONCURRENT_QUERIES, stats + ) except Exception as e: client_logger.info("Exception in application layer") client_logger.error(e) @@ -100,13 +106,16 @@ def run_workload_sync(client_id, client_logger): connection_policy = documents.ConnectionPolicy() connection_policy.UseMultipleWriteLocations = USE_MULTIPLE_WRITABLE_LOCATIONS - with SyncClient(COSMOS_URI, COSMOS_CREDENTIAL, - connection_policy=connection_policy, - preferred_locations=PREFERRED_LOCATIONS, - excluded_locations=CLIENT_EXCLUDED_LOCATIONS, - enable_diagnostics_logging=True, - logger=client_logger, - user_agent=get_user_agent(client_id)) as client: + with SyncClient( + COSMOS_URI, + COSMOS_CREDENTIAL, + connection_policy=connection_policy, + preferred_locations=PREFERRED_LOCATIONS, + excluded_locations=CLIENT_EXCLUDED_LOCATIONS, + enable_diagnostics_logging=True, + logger=client_logger, + user_agent=get_user_agent(client_id), + ) as client: db = client.get_database_client(COSMOS_DATABASE) cont = db.get_container_client(COSMOS_CONTAINER) time.sleep(1) @@ -114,11 +123,17 @@ def run_workload_sync(client_id, client_logger): while True: try: if "write" in ops: - upsert_item(cont, REQUEST_EXCLUDED_LOCATIONS, CONCURRENT_REQUESTS, stats) + upsert_item( + cont, REQUEST_EXCLUDED_LOCATIONS, CONCURRENT_REQUESTS, stats + ) if "read" in ops: - read_item(cont, REQUEST_EXCLUDED_LOCATIONS, CONCURRENT_REQUESTS, stats) + read_item( + cont, REQUEST_EXCLUDED_LOCATIONS, CONCURRENT_REQUESTS, stats + ) if "query" in ops: - query_items(cont, REQUEST_EXCLUDED_LOCATIONS, CONCURRENT_QUERIES, stats) + query_items( + cont, REQUEST_EXCLUDED_LOCATIONS, CONCURRENT_QUERIES, stats + ) except Exception as e: client_logger.info("Exception in application layer") client_logger.error(e) diff --git a/sdk/cosmos/azure-cosmos/tests/workloads/workload_configs.py b/sdk/cosmos/azure-cosmos/tests/workloads/workload_configs.py index f1e81c09e12f..0cb478eff68f 100644 --- a/sdk/cosmos/azure-cosmos/tests/workloads/workload_configs.py +++ b/sdk/cosmos/azure-cosmos/tests/workloads/workload_configs.py @@ -6,9 +6,21 @@ from azure.identity import DefaultAzureCredential -PREFERRED_LOCATIONS = os.environ.get("COSMOS_PREFERRED_LOCATIONS", "").split(",") if os.environ.get("COSMOS_PREFERRED_LOCATIONS") else [] -CLIENT_EXCLUDED_LOCATIONS = os.environ.get("COSMOS_CLIENT_EXCLUDED_LOCATIONS", "").split(",") if os.environ.get("COSMOS_CLIENT_EXCLUDED_LOCATIONS") else [] -REQUEST_EXCLUDED_LOCATIONS = os.environ.get("COSMOS_REQUEST_EXCLUDED_LOCATIONS", "").split(",") if os.environ.get("COSMOS_REQUEST_EXCLUDED_LOCATIONS") else [] +PREFERRED_LOCATIONS = ( + os.environ.get("COSMOS_PREFERRED_LOCATIONS", "").split(",") + if os.environ.get("COSMOS_PREFERRED_LOCATIONS") + else [] +) +CLIENT_EXCLUDED_LOCATIONS = ( + os.environ.get("COSMOS_CLIENT_EXCLUDED_LOCATIONS", "").split(",") + if os.environ.get("COSMOS_CLIENT_EXCLUDED_LOCATIONS") + else [] +) +REQUEST_EXCLUDED_LOCATIONS = ( + os.environ.get("COSMOS_REQUEST_EXCLUDED_LOCATIONS", "").split(",") + if os.environ.get("COSMOS_REQUEST_EXCLUDED_LOCATIONS") + else [] +) COSMOS_PROXY_URI = os.environ.get("COSMOS_PROXY_URI", "0.0.0.0") COSMOS_URI = os.environ.get("COSMOS_URI", "") COSMOS_KEY = os.environ.get("COSMOS_KEY", "") @@ -18,12 +30,18 @@ USER_AGENT_PREFIX = os.environ.get("COSMOS_USER_AGENT_PREFIX", "") LOG_LEVEL = getattr(logging, os.environ.get("COSMOS_LOG_LEVEL", "DEBUG"), logging.DEBUG) APP_INSIGHTS_CONNECTION_STRING = os.environ.get("APP_INSIGHTS_CONNECTION_STRING", "") -CIRCUIT_BREAKER_ENABLED = os.environ.get("AZURE_COSMOS_ENABLE_CIRCUIT_BREAKER", "false").lower() == "true" -USE_MULTIPLE_WRITABLE_LOCATIONS = os.environ.get("COSMOS_USE_MULTIPLE_WRITABLE_LOCATIONS", "false").lower() == "true" +CIRCUIT_BREAKER_ENABLED = ( + os.environ.get("AZURE_COSMOS_ENABLE_CIRCUIT_BREAKER", "false").lower() == "true" +) +USE_MULTIPLE_WRITABLE_LOCATIONS = ( + os.environ.get("COSMOS_USE_MULTIPLE_WRITABLE_LOCATIONS", "false").lower() == "true" +) CONCURRENT_REQUESTS = int(os.environ.get("COSMOS_CONCURRENT_REQUESTS", "100")) CONCURRENT_QUERIES = int(os.environ.get("COSMOS_CONCURRENT_QUERIES", "2")) PARTITION_KEY = os.environ.get("COSMOS_PARTITION_KEY", "id") -NUMBER_OF_LOGICAL_PARTITIONS = int(os.environ.get("COSMOS_NUMBER_OF_LOGICAL_PARTITIONS", "10000")) +NUMBER_OF_LOGICAL_PARTITIONS = int( + os.environ.get("COSMOS_NUMBER_OF_LOGICAL_PARTITIONS", "10000") +) THROUGHPUT = int(os.environ.get("COSMOS_THROUGHPUT", "1000000")) # Workload behavior @@ -35,7 +53,9 @@ ) _unknown_ops = WORKLOAD_OPERATIONS - _VALID_OPERATIONS if _unknown_ops: - raise ValueError(f"Unknown WORKLOAD_OPERATIONS: {_unknown_ops}. Valid: {_VALID_OPERATIONS}") + raise ValueError( + f"Unknown WORKLOAD_OPERATIONS: {_unknown_ops}. Valid: {_VALID_OPERATIONS}" + ) WORKLOAD_USE_PROXY = os.environ.get("WORKLOAD_USE_PROXY", "false").lower() == "true" WORKLOAD_USE_SYNC = os.environ.get("WORKLOAD_USE_SYNC", "false").lower() == "true" diff --git a/sdk/cosmos/azure-cosmos/tests/workloads/workload_utils.py b/sdk/cosmos/azure-cosmos/tests/workloads/workload_utils.py index a2e65edbda7d..d3b9b538f626 100644 --- a/sdk/cosmos/azure-cosmos/tests/workloads/workload_utils.py +++ b/sdk/cosmos/azure-cosmos/tests/workloads/workload_utils.py @@ -18,12 +18,21 @@ _NOISY_ERRORS = set([404, 409, 412]) _NOISY_SUB_STATUS_CODES = set([0, None]) -_REQUIRED_ATTRIBUTES = ["resource_type", "verb", "operation_type", "status_code", "sub_status_code", "duration"] +_REQUIRED_ATTRIBUTES = [ + "resource_type", + "verb", + "operation_type", + "status_code", + "sub_status_code", + "duration", +] + def get_user_agent(client_id): prefix = USER_AGENT_PREFIX + "-" if USER_AGENT_PREFIX else "" return prefix + str(client_id) + "-" + datetime.now().strftime("%Y%m%d-%H%M%S") + def get_existing_random_item(): random_int = random.randint(0, NUMBER_OF_LOGICAL_PARTITIONS) item = create_random_item() @@ -31,6 +40,7 @@ def get_existing_random_item(): item["pk"] = "pk-" + str(random_int) return item + def create_random_item(): paragraph1 = ( "Lorem ipsum dolor sit amet, consectetur adipiscing elit. " @@ -49,21 +59,24 @@ def create_random_item(): "value": random.randint(1, 1000000000), "timestamp": datetime.utcnow().isoformat() + "Z", "flag": random.choice([True, False]), - "description": paragraph1 + "\n\n" + paragraph2 + "description": paragraph1 + "\n\n" + paragraph2, } + def _get_upsert_item(): # 10 percent of the time, create a new item instead of updating an existing one return create_random_item() if random.random() < 0.1 else get_existing_random_item() + def _record_error(stats, operation, error): """Extract Cosmos status codes and record the error in stats.""" status_code = sub_status_code = None if isinstance(error, CosmosHttpResponseError): status_code = error.status_code - sub_status_code = getattr(error, 'sub_status', None) - stats.record_error(operation, str(error), traceback.format_exc(), - status_code, sub_status_code) + sub_status_code = getattr(error, "sub_status", None) + stats.record_error( + operation, str(error), traceback.format_exc(), status_code, sub_status_code + ) def upsert_item(container, excluded_locations, num_upserts, stats=None): @@ -72,8 +85,12 @@ def upsert_item(container, excluded_locations, num_upserts, stats=None): start = time.perf_counter_ns() try: if excluded_locations: - container.upsert_item(item, etag=None, match_condition=None, - excluded_locations=excluded_locations) + container.upsert_item( + item, + etag=None, + match_condition=None, + excluded_locations=excluded_locations, + ) else: container.upsert_item(item, etag=None, match_condition=None) if stats: @@ -90,10 +107,17 @@ def read_item(container, excluded_locations, num_reads, stats=None): start = time.perf_counter_ns() try: if excluded_locations: - container.read_item(item["id"], item[PARTITION_KEY], etag=None, match_condition=None, - excluded_locations=excluded_locations) + container.read_item( + item["id"], + item[PARTITION_KEY], + etag=None, + match_condition=None, + excluded_locations=excluded_locations, + ) else: - container.read_item(item["id"], item[PARTITION_KEY], etag=None, match_condition=None) + container.read_item( + item["id"], item[PARTITION_KEY], etag=None, match_condition=None + ) if stats: stats.record("ReadItem", (time.perf_counter_ns() - start) / 1_000_000) except Exception as e: @@ -101,6 +125,7 @@ def read_item(container, excluded_locations, num_reads, stats=None): _record_error(stats, "ReadItem", e) raise + def query_items(container, excluded_locations, num_queries, stats=None): for _ in range(num_queries): perform_query(container, excluded_locations, stats) @@ -111,16 +136,24 @@ def perform_query(container, excluded_locations, stats=None): start = time.perf_counter_ns() try: if excluded_locations: - results = container.query_items(query="SELECT * FROM c where c.id=@id and c.pk=@pk", - parameters=[{"name": "@id", "value": random_item["id"]}, - {"name": "@pk", "value": random_item["pk"]}], - partition_key=random_item[PARTITION_KEY], - excluded_locations=excluded_locations) + results = container.query_items( + query="SELECT * FROM c where c.id=@id and c.pk=@pk", + parameters=[ + {"name": "@id", "value": random_item["id"]}, + {"name": "@pk", "value": random_item["pk"]}, + ], + partition_key=random_item[PARTITION_KEY], + excluded_locations=excluded_locations, + ) else: - results = container.query_items(query="SELECT * FROM c where c.id=@id and c.pk=@pk", - parameters=[{"name": "@id", "value": random_item["id"]}, - {"name": "@pk", "value": random_item["pk"]}], - partition_key=random_item[PARTITION_KEY]) + results = container.query_items( + query="SELECT * FROM c where c.id=@id and c.pk=@pk", + parameters=[ + {"name": "@id", "value": random_item["id"]}, + {"name": "@pk", "value": random_item["pk"]}, + ], + partition_key=random_item[PARTITION_KEY], + ) items = [item for item in results] if stats: stats.record("QueryItems", (time.perf_counter_ns() - start) / 1_000_000) @@ -135,8 +168,12 @@ async def _timed_upsert_async(container, item, excluded_locations, stats): start = time.perf_counter_ns() try: if excluded_locations: - await container.upsert_item(item, etag=None, match_condition=None, - excluded_locations=excluded_locations) + await container.upsert_item( + item, + etag=None, + match_condition=None, + excluded_locations=excluded_locations, + ) else: await container.upsert_item(item, etag=None, match_condition=None) if stats: @@ -152,10 +189,17 @@ async def _timed_read_async(container, item, excluded_locations, stats): start = time.perf_counter_ns() try: if excluded_locations: - await container.read_item(item["id"], item[PARTITION_KEY], etag=None, match_condition=None, - excluded_locations=excluded_locations) + await container.read_item( + item["id"], + item[PARTITION_KEY], + etag=None, + match_condition=None, + excluded_locations=excluded_locations, + ) else: - await container.read_item(item["id"], item[PARTITION_KEY], etag=None, match_condition=None) + await container.read_item( + item["id"], item[PARTITION_KEY], etag=None, match_condition=None + ) if stats: stats.record("ReadItem", (time.perf_counter_ns() - start) / 1_000_000) except Exception as e: @@ -169,16 +213,24 @@ async def _timed_query_async(container, random_item, excluded_locations, stats): start = time.perf_counter_ns() try: if excluded_locations: - results = container.query_items(query="SELECT * FROM c where c.id=@id and c.pk=@pk", - parameters=[{"name": "@id", "value": random_item["id"]}, - {"name": "@pk", "value": random_item["pk"]}], - partition_key=random_item[PARTITION_KEY], - excluded_locations=excluded_locations) + results = container.query_items( + query="SELECT * FROM c where c.id=@id and c.pk=@pk", + parameters=[ + {"name": "@id", "value": random_item["id"]}, + {"name": "@pk", "value": random_item["pk"]}, + ], + partition_key=random_item[PARTITION_KEY], + excluded_locations=excluded_locations, + ) else: - results = container.query_items(query="SELECT * FROM c where c.id=@id and c.pk=@pk", - parameters=[{"name": "@id", "value": random_item["id"]}, - {"name": "@pk", "value": random_item["pk"]}], - partition_key=random_item[PARTITION_KEY]) + results = container.query_items( + query="SELECT * FROM c where c.id=@id and c.pk=@pk", + parameters=[ + {"name": "@id", "value": random_item["id"]}, + {"name": "@pk", "value": random_item["pk"]}, + ], + partition_key=random_item[PARTITION_KEY], + ) items = [item async for item in results] if stats: stats.record("QueryItems", (time.perf_counter_ns() - start) / 1_000_000) @@ -188,7 +240,9 @@ async def _timed_query_async(container, random_item, excluded_locations, stats): raise -async def upsert_item_concurrently(container, excluded_locations, num_upserts, stats=None): +async def upsert_item_concurrently( + container, excluded_locations, num_upserts, stats=None +): tasks = [] for _ in range(num_upserts): item = _get_upsert_item() @@ -204,24 +258,32 @@ async def read_item_concurrently(container, excluded_locations, num_reads, stats await asyncio.gather(*tasks) -async def query_items_concurrently(container, excluded_locations, num_queries, stats=None): +async def query_items_concurrently( + container, excluded_locations, num_queries, stats=None +): tasks = [] for _ in range(num_queries): random_item = get_existing_random_item() - tasks.append(_timed_query_async(container, random_item, excluded_locations, stats)) + tasks.append( + _timed_query_async(container, random_item, excluded_locations, stats) + ) await asyncio.gather(*tasks) + def create_custom_session(): - proxied_connector = ProxiedTCPConnector(proxy_host=COSMOS_PROXY_URI, - proxy_port=5100, - limit=100, # Max total open connections - limit_per_host=10, # Max per Cosmos DB host - keepalive_timeout=30, # Keep-alive duration for idle connections - enable_cleanup_closed=True) # Helpful for TLS/FIN issues + proxied_connector = ProxiedTCPConnector( + proxy_host=COSMOS_PROXY_URI, + proxy_port=5100, + limit=100, # Max total open connections + limit_per_host=10, # Max per Cosmos DB host + keepalive_timeout=30, # Keep-alive duration for idle connections + enable_cleanup_closed=True, + ) # Helpful for TLS/FIN issues session = ClientSession(connector=proxied_connector) return session + def create_logger(file_name): os.environ["AZURE_COSMOS_ENABLE_CIRCUIT_BREAKER"] = str(CIRCUIT_BREAKER_ENABLED) logger = logging.getLogger() @@ -233,9 +295,9 @@ def create_logger(file_name): prefix = os.path.splitext(file_name)[0] + "-" + str(os.getpid()) # Create a rotating file handler handler = RotatingFileHandler( - "log-" + get_user_agent(prefix) + '.log', + "log-" + get_user_agent(prefix) + ".log", maxBytes=1024 * 1024 * 10, # 10 mb - backupCount=5 + backupCount=5, ) logger.setLevel(LOG_LEVEL) # create filters for the logger handler to reduce the noise @@ -244,14 +306,13 @@ def create_logger(file_name): logger.addHandler(handler) return prefix, logger + def create_inner_logger(file_name="internal_logger_tues"): logger = logging.getLogger("internal_requests") prefix = os.path.splitext(file_name)[0] + "-" + str(os.getpid()) # Create a rotating file handler handler = RotatingFileHandler( - "log-" + file_name + '.log', - maxBytes=1024 * 1024 * 10, # 10 mb - backupCount=5 + "log-" + file_name + ".log", maxBytes=1024 * 1024 * 10, backupCount=5 # 10 mb ) logger.setLevel(LOG_LEVEL) logger.addHandler(handler) @@ -268,11 +329,17 @@ def filter(self, record): if all(hasattr(record, attr) for attr in _REQUIRED_ATTRIBUTES): # Check the conditions # Check database account reads - if record.resource_type == "databaseaccount" and record.verb == "GET" and record.operation_type == "Read": + if ( + record.resource_type == "databaseaccount" + and record.verb == "GET" + and record.operation_type == "Read" + ): return True # Check if there is an error and omit noisy errors if record.status_code >= 400 and not ( - record.status_code in _NOISY_ERRORS and record.sub_status_code in _NOISY_SUB_STATUS_CODES): + record.status_code in _NOISY_ERRORS + and record.sub_status_code in _NOISY_SUB_STATUS_CODES + ): return True # Check if the latency (duration) was above 1000 ms if record.duration >= 1000: From 12b4a2026a01253b8f09bc19b868a2d7ba0b7a70 Mon Sep 17 00:00:00 2001 From: tvaron3 Date: Fri, 10 Apr 2026 14:39:05 -0700 Subject: [PATCH 12/15] feat(workloads): add config_multi_write_enabled to PerfResult Reports COSMOS_USE_MULTIPLE_WRITABLE_LOCATIONS in the config snapshot so it's visible in the Grafana dashboard and queryable from Kusto. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- sdk/cosmos/azure-cosmos/tests/workloads/perf_reporter.py | 1 + 1 file changed, 1 insertion(+) diff --git a/sdk/cosmos/azure-cosmos/tests/workloads/perf_reporter.py b/sdk/cosmos/azure-cosmos/tests/workloads/perf_reporter.py index cf89c4079c3d..f1e7d2f74a95 100644 --- a/sdk/cosmos/azure-cosmos/tests/workloads/perf_reporter.py +++ b/sdk/cosmos/azure-cosmos/tests/workloads/perf_reporter.py @@ -192,6 +192,7 @@ def _flush(self): "config_application_region": preferred, "config_excluded_regions": excluded, "config_ppcb_enabled": ppcb, + "config_multi_write_enabled": multi_write, } try: self._container.upsert_item(doc) From f647d5433f7d2a9b8c008fe4d874389df8488d9b Mon Sep 17 00:00:00 2001 From: tvaron3 Date: Fri, 10 Apr 2026 15:16:06 -0700 Subject: [PATCH 13/15] fix(workloads): define multi_write variable in perf_reporter MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The variable was used but never defined — caused pylint E0602. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- sdk/cosmos/azure-cosmos/tests/workloads/perf_reporter.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/sdk/cosmos/azure-cosmos/tests/workloads/perf_reporter.py b/sdk/cosmos/azure-cosmos/tests/workloads/perf_reporter.py index f1e7d2f74a95..78b35ce596d4 100644 --- a/sdk/cosmos/azure-cosmos/tests/workloads/perf_reporter.py +++ b/sdk/cosmos/azure-cosmos/tests/workloads/perf_reporter.py @@ -162,6 +162,10 @@ def _flush(self): os.environ.get("AZURE_COSMOS_ENABLE_CIRCUIT_BREAKER", "false").lower() == "true" ) + multi_write = ( + os.environ.get("COSMOS_USE_MULTIPLE_WRITABLE_LOCATIONS", "false").lower() + == "true" + ) summaries, errors = self._stats.drain_all() for s in summaries: From b595bd90104481b8c800ecbc76f15e2fe7476830 Mon Sep 17 00:00:00 2001 From: tvaron3 Date: Fri, 10 Apr 2026 15:42:57 -0700 Subject: [PATCH 14/15] =?UTF-8?q?fix(workloads):=20address=20review=20find?= =?UTF-8?q?ings=20=E2=80=94=20lazy=20imports,=20session=20close,=20histogr?= =?UTF-8?q?am=20clamp,=20safe=20parsing?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../tests/workloads/perf_config.py | 11 +++- .../tests/workloads/perf_stats.py | 17 ++++-- .../azure-cosmos/tests/workloads/workload.py | 56 ++++++++++++++----- .../tests/workloads/workload_configs.py | 28 +++++----- .../tests/workloads/workload_utils.py | 6 +- 5 files changed, 82 insertions(+), 36 deletions(-) diff --git a/sdk/cosmos/azure-cosmos/tests/workloads/perf_config.py b/sdk/cosmos/azure-cosmos/tests/workloads/perf_config.py index 28959f74c863..9e12a6603487 100644 --- a/sdk/cosmos/azure-cosmos/tests/workloads/perf_config.py +++ b/sdk/cosmos/azure-cosmos/tests/workloads/perf_config.py @@ -23,6 +23,13 @@ def _get_git_sha() -> str: return "unknown" +def _safe_int(value: object, default: int) -> int: + try: + return int(value) + except (ValueError, TypeError): + return default + + def get_perf_config() -> dict: """Build performance reporter configuration from environment variables.""" return { @@ -30,7 +37,9 @@ def get_perf_config() -> dict: "results_endpoint": os.environ.get("RESULTS_COSMOS_URI", ""), "results_database": os.environ.get("RESULTS_COSMOS_DATABASE", "perfdb"), "results_container": os.environ.get("RESULTS_COSMOS_CONTAINER", "perfresults"), - "report_interval": int(os.environ.get("PERF_REPORT_INTERVAL", "300")), + "report_interval": _safe_int( + os.environ.get("PERF_REPORT_INTERVAL", "300"), 300 + ), "workload_id": os.environ.get("PERF_WORKLOAD_ID", str(uuid.uuid4())), "commit_sha": os.environ.get("PERF_COMMIT_SHA", _get_git_sha()), } diff --git a/sdk/cosmos/azure-cosmos/tests/workloads/perf_stats.py b/sdk/cosmos/azure-cosmos/tests/workloads/perf_stats.py index 6e7d61644837..74c963666295 100644 --- a/sdk/cosmos/azure-cosmos/tests/workloads/perf_stats.py +++ b/sdk/cosmos/azure-cosmos/tests/workloads/perf_stats.py @@ -14,6 +14,10 @@ ) +_MIN_VALUE_US = 1 +_MAX_VALUE_US = 60_000_000 + + class Stats: """Thread-safe per-operation latency and error tracking using HdrHistogram. @@ -33,10 +37,13 @@ def record(self, operation: str, duration_ms: float): """Record a successful operation with its duration in milliseconds.""" with self._lock: if operation not in self._histograms: - self._histograms[operation] = HdrHistogram(1, 60_000_000, 3) + self._histograms[operation] = HdrHistogram( + _MIN_VALUE_US, _MAX_VALUE_US, 3 + ) self._error_counts[operation] = 0 - # Record in microseconds for sub-ms precision - self._histograms[operation].record_value(max(1, int(duration_ms * 1000))) + # Clamp to histogram range to prevent crashes on very slow operations + value_us = max(_MIN_VALUE_US, min(int(duration_ms * 1000), _MAX_VALUE_US)) + self._histograms[operation].record_value(value_us) def record_error( self, @@ -50,7 +57,9 @@ def record_error( with self._lock: if operation not in self._error_counts: self._error_counts[operation] = 0 - self._histograms[operation] = HdrHistogram(1, 60_000_000, 3) + self._histograms[operation] = HdrHistogram( + _MIN_VALUE_US, _MAX_VALUE_US, 3 + ) self._error_counts[operation] += 1 self._errors.append( { diff --git a/sdk/cosmos/azure-cosmos/tests/workloads/workload.py b/sdk/cosmos/azure-cosmos/tests/workloads/workload.py index a08721fb3123..d6e5b8e1e020 100644 --- a/sdk/cosmos/azure-cosmos/tests/workloads/workload.py +++ b/sdk/cosmos/azure-cosmos/tests/workloads/workload.py @@ -9,6 +9,7 @@ WORKLOAD_USE_SYNC use sync client instead of async (default: false) """ +import logging import os import asyncio import time @@ -19,9 +20,6 @@ from workload_utils import * from workload_configs import * -from perf_config import get_perf_config -from perf_stats import Stats -from perf_reporter import PerfReporter async def run_workload_async(client_id, client_logger): @@ -29,15 +27,27 @@ async def run_workload_async(client_id, client_logger): ops = WORKLOAD_OPERATIONS use_proxy = WORKLOAD_USE_PROXY - stats = Stats() - perf_config = get_perf_config() + stats = None + perf_config = None reporter = None - if perf_config["enabled"] and perf_config["results_endpoint"]: - reporter = PerfReporter(stats, perf_config) - reporter.start() try: - transport = None + from perf_config import get_perf_config + + perf_config = get_perf_config() + if perf_config["enabled"] and perf_config["results_endpoint"]: + from perf_stats import Stats + from perf_reporter import PerfReporter + + stats = Stats() + reporter = PerfReporter(stats, perf_config) + reporter.start() + except ImportError as e: + logging.getLogger(__name__).info("Perf reporting disabled: %s", e) + + session = None + transport = None + try: if use_proxy: session = create_custom_session() transport = AioHttpTransport(session=session, session_owner=False) @@ -88,19 +98,35 @@ async def run_workload_async(client_id, client_logger): await client.__aexit__(None, None, None) finally: if reporter: - reporter.stop() + try: + reporter.stop() + except Exception: + pass + if session: + await session.close() def run_workload_sync(client_id, client_logger): """Sync workload loop — used when WORKLOAD_USE_SYNC=true.""" ops = WORKLOAD_OPERATIONS - stats = Stats() - perf_config = get_perf_config() + stats = None + perf_config = None reporter = None - if perf_config["enabled"] and perf_config["results_endpoint"]: - reporter = PerfReporter(stats, perf_config) - reporter.start() + + try: + from perf_config import get_perf_config + + perf_config = get_perf_config() + if perf_config["enabled"] and perf_config["results_endpoint"]: + from perf_stats import Stats + from perf_reporter import PerfReporter + + stats = Stats() + reporter = PerfReporter(stats, perf_config) + reporter.start() + except ImportError as e: + logging.getLogger(__name__).info("Perf reporting disabled: %s", e) try: connection_policy = documents.ConnectionPolicy() diff --git a/sdk/cosmos/azure-cosmos/tests/workloads/workload_configs.py b/sdk/cosmos/azure-cosmos/tests/workloads/workload_configs.py index 0cb478eff68f..c33fb17ab1cd 100644 --- a/sdk/cosmos/azure-cosmos/tests/workloads/workload_configs.py +++ b/sdk/cosmos/azure-cosmos/tests/workloads/workload_configs.py @@ -6,21 +6,19 @@ from azure.identity import DefaultAzureCredential -PREFERRED_LOCATIONS = ( - os.environ.get("COSMOS_PREFERRED_LOCATIONS", "").split(",") - if os.environ.get("COSMOS_PREFERRED_LOCATIONS") - else [] -) -CLIENT_EXCLUDED_LOCATIONS = ( - os.environ.get("COSMOS_CLIENT_EXCLUDED_LOCATIONS", "").split(",") - if os.environ.get("COSMOS_CLIENT_EXCLUDED_LOCATIONS") - else [] -) -REQUEST_EXCLUDED_LOCATIONS = ( - os.environ.get("COSMOS_REQUEST_EXCLUDED_LOCATIONS", "").split(",") - if os.environ.get("COSMOS_REQUEST_EXCLUDED_LOCATIONS") - else [] -) + +def _parse_region_list(env_var_name): + value = os.environ.get(env_var_name, "") + return ( + [region.strip() for region in value.split(",") if region.strip()] + if value + else [] + ) + + +PREFERRED_LOCATIONS = _parse_region_list("COSMOS_PREFERRED_LOCATIONS") +CLIENT_EXCLUDED_LOCATIONS = _parse_region_list("COSMOS_CLIENT_EXCLUDED_LOCATIONS") +REQUEST_EXCLUDED_LOCATIONS = _parse_region_list("COSMOS_REQUEST_EXCLUDED_LOCATIONS") COSMOS_PROXY_URI = os.environ.get("COSMOS_PROXY_URI", "0.0.0.0") COSMOS_URI = os.environ.get("COSMOS_URI", "") COSMOS_KEY = os.environ.get("COSMOS_KEY", "") diff --git a/sdk/cosmos/azure-cosmos/tests/workloads/workload_utils.py b/sdk/cosmos/azure-cosmos/tests/workloads/workload_utils.py index d3b9b538f626..6ff2a2d78f39 100644 --- a/sdk/cosmos/azure-cosmos/tests/workloads/workload_utils.py +++ b/sdk/cosmos/azure-cosmos/tests/workloads/workload_utils.py @@ -70,6 +70,8 @@ def _get_upsert_item(): def _record_error(stats, operation, error): """Extract Cosmos status codes and record the error in stats.""" + if not stats: + return status_code = sub_status_code = None if isinstance(error, CosmosHttpResponseError): status_code = error.status_code @@ -312,7 +314,9 @@ def create_inner_logger(file_name="internal_logger_tues"): prefix = os.path.splitext(file_name)[0] + "-" + str(os.getpid()) # Create a rotating file handler handler = RotatingFileHandler( - "log-" + file_name + ".log", maxBytes=1024 * 1024 * 10, backupCount=5 # 10 mb + "log-" + file_name + ".log", + maxBytes=1024 * 1024 * 10, # 10 mb + backupCount=5, ) logger.setLevel(LOG_LEVEL) logger.addHandler(handler) From a13899757e18bea10fcb642b3d9100d2d77d65d5 Mon Sep 17 00:00:00 2001 From: tvaron3 Date: Sat, 11 Apr 2026 18:36:01 -0700 Subject: [PATCH 15/15] fix(workloads): add perfdb, perfresults, ppcb, hdrh to cosmos cspell dictionary Move cspell words to sdk/cosmos/azure-cosmos/cspell.json instead of root .vscode/cspell.json to keep changes within cosmos folder scope. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- sdk/cosmos/azure-cosmos/cspell.json | 9 +++++++++ 1 file changed, 9 insertions(+) create mode 100644 sdk/cosmos/azure-cosmos/cspell.json diff --git a/sdk/cosmos/azure-cosmos/cspell.json b/sdk/cosmos/azure-cosmos/cspell.json new file mode 100644 index 000000000000..5eb68afe9de1 --- /dev/null +++ b/sdk/cosmos/azure-cosmos/cspell.json @@ -0,0 +1,9 @@ +{ + "ignoreWords": [ + "perfdb", + "perfresults", + "ppcb", + "hdrh", + "hdrhistogram" + ] +}