From 94fafc48e15ec581cb1ae234204ba76f73a56e6d Mon Sep 17 00:00:00 2001 From: Deepika Awasthi Date: Wed, 25 Mar 2026 10:19:23 -0700 Subject: [PATCH 1/2] Add list_pending_activities sample: find workflows with pending activities --- list_pending_activities/README.md | 112 +++++++++++ list_pending_activities/__init__.py | 0 list_pending_activities/find_pending.py | 247 ++++++++++++++++++++++++ 3 files changed, 359 insertions(+) create mode 100644 list_pending_activities/README.md create mode 100644 list_pending_activities/__init__.py create mode 100644 list_pending_activities/find_pending.py diff --git a/list_pending_activities/README.md b/list_pending_activities/README.md new file mode 100644 index 00000000..58d5b43d --- /dev/null +++ b/list_pending_activities/README.md @@ -0,0 +1,112 @@ +# List Pending Activities + +A command-line tool that queries a Temporal Cloud namespace to find all workflows with pending activities. Supports optional filters and saves results to a local JSON file. + +## How it works + +1. Builds a [visibility query](https://docs.temporal.io/visibility#list-filter) from the optional filters you provide +2. Calls `client.list_workflows()` to retrieve matching workflows +3. Calls `handle.describe()` on each workflow to check for pending activities +4. Prints results to the console and saves them to `output/pending_activities_.json` + +Both parent and child workflows are found — child workflows are independent executions in the visibility store and are queried the same way. + +## Authentication + +The script supports two auth modes. If `TEMPORAL_API_KEY` is set, it uses API key auth via the regional endpoint. Otherwise it falls back to mTLS via the namespace endpoint. + +**API key:** +```bash +export TEMPORAL_API_KEY="your-api-key" +python find_pending.py +``` + +**mTLS (default):** +```bash +python find_pending.py +``` + +Requires `client.pem` and `client.key` in the certs directory. + +### Environment variables + +| Variable | Default | Description | +|---|---|---| +| `TEMPORAL_API_KEY` | (not set) | API key for auth. If set, uses the regional API endpoint. | +| `TEMPORAL_NAMESPACE` | `deepika-test-namespace.a2dd6` | Namespace to query. | +| `TEMPORAL_ADDRESS` | Regional or namespace endpoint | Overrides the target host for either auth mode. | +| `TEMPORAL_CERTS_DIR` | `/Users/deepikaawasthi/temporal/temporal-certs` | Directory containing `client.pem` and `client.key` for mTLS. | + +## Usage + +All flags are optional — use any combination to narrow the search. + +```bash +# No filters — scans all workflows in the namespace +python find_pending.py + +# Filter by task queue +python find_pending.py --task-queue my-queue + +# Filter by workflow type +python find_pending.py --workflow-type MyWorkflow + +# Filter by execution status +python find_pending.py --status Running + +# Filter by start time range +python find_pending.py --start-time-after "2026-03-01T00:00:00Z" --start-time-before "2026-03-25T00:00:00Z" + +# Filter by close time range +python find_pending.py --close-time-after "2026-03-20T00:00:00Z" --close-time-before "2026-03-25T00:00:00Z" + +# Combine any filters +python find_pending.py --task-queue my-queue --workflow-type MyWorkflow --status Running --start-time-after "2026-03-20T00:00:00Z" +``` + +### Available filters + +| Flag | Visibility Query | Description | +|---|---|---| +| `--task-queue` | `TaskQueue="..."` | Filter by task queue name | +| `--workflow-type` | `WorkflowType="..."` | Filter by workflow type name | +| `--status` | `ExecutionStatus="..."` | Filter by status: `Running`, `Completed`, `Failed`, `Canceled`, `Terminated`, `ContinuedAsNew`, `TimedOut` | +| `--start-time-after` | `StartTime>="..."` | Workflows started at or after this time | +| `--start-time-before` | `StartTime<="..."` | Workflows started at or before this time | +| `--close-time-after` | `CloseTime>="..."` | Workflows closed at or after this time | +| `--close-time-before` | `CloseTime<="..."` | Workflows closed at or before this time | + +All times are in ISO 8601 format (e.g. `2026-03-01T00:00:00Z`). + +## Output + +Results are printed to the console and saved to `output/pending_activities_.json`: + +```json +{ + "generated_at": "2026-03-25T10:04:12.832303", + "query_used": "WorkflowType=\"PendingActivitiesWorkflow\" AND ExecutionStatus=\"Running\"", + "total_workflows": 1, + "workflows": [ + { + "workflow_id": "hello-pending-activities-workflow", + "run_id": "019d25f3-65f4-7c71-9c86-acfb68faec15", + "pending_activity_count": 3, + "pending_activities": [ + { + "activity_id": "1", + "activity_type": "say_hello", + "state": "1", + "attempt": 1 + } + ] + } + ] +} +``` + +## Notes + +- With no filters the script scans **all** workflows in the namespace. Use filters to narrow the scope for large namespaces. +- Only workflows with at least one pending activity appear in the output. +- The `output/` directory is created automatically on first run. diff --git a/list_pending_activities/__init__.py b/list_pending_activities/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/list_pending_activities/find_pending.py b/list_pending_activities/find_pending.py new file mode 100644 index 00000000..1dc9a77f --- /dev/null +++ b/list_pending_activities/find_pending.py @@ -0,0 +1,247 @@ +"""Find workflows with pending activities and save results locally. + +All filters are optional — use any combination to narrow the search. + +Authentication: + API key: export TEMPORAL_API_KEY="your-api-key" + mTLS: Falls back to certs if TEMPORAL_API_KEY is not set. + +Usage: + python find_pending.py + python find_pending.py --task-queue my-queue + python find_pending.py --workflow-type MyWorkflow --status Running + python find_pending.py --start-time-after "2026-03-01T00:00:00Z" --start-time-before "2026-03-25T00:00:00Z" + python find_pending.py --close-time-after "2026-03-20T00:00:00Z" +""" + +import argparse +import asyncio +import json +import os +from datetime import datetime + +from temporalio.client import Client +from temporalio.service import TLSConfig + +DEFAULT_NAMESPACE = "deepika-test-namespace.a2dd6" # namespace - . +DEFAULT_API_HOST = "us-east-1.aws.api.temporal.io:7233" # regional endpoint for your namespace +DEFAULT_MTLS_HOST = "deepika-test-namespace.a2dd6.tmprl.cloud:7233" # namespace endpoint for your namespace +DEFAULT_CERTS_DIR = "/Users/deepikaawasthi/temporal/temporal-certs" # certs directory + + +def resolve_api_key() -> str | None: + """Read API key from TEMPORAL_API_KEY env var, or return None to fall back to mTLS.""" + return os.environ.get("TEMPORAL_API_KEY") + + +async def create_client(api_key: str | None = None) -> Client: + namespace = os.environ.get("TEMPORAL_NAMESPACE", DEFAULT_NAMESPACE) + + if api_key: + target_host = os.environ.get("TEMPORAL_ADDRESS", DEFAULT_API_HOST) + print(f"Authenticating with API key to {target_host}") + return await Client.connect( + target_host, + namespace=namespace, + api_key=api_key, + tls=True, + ) + + # Fall back to mTLS + target_host = os.environ.get("TEMPORAL_ADDRESS", DEFAULT_MTLS_HOST) + certs_dir = os.environ.get("TEMPORAL_CERTS_DIR", DEFAULT_CERTS_DIR) + print(f"Authenticating with mTLS to {target_host}") + + with open(os.path.join(certs_dir, "client.pem"), "rb") as f: + client_cert = f.read() + with open(os.path.join(certs_dir, "client.key"), "rb") as f: + client_key = f.read() + + return await Client.connect( + target_host, + namespace=namespace, + tls=TLSConfig( + client_cert=client_cert, + client_private_key=client_key, + ), + ) + + +def build_query( + task_queue: str | None = None, + workflow_type: str | None = None, + status: str | None = None, + start_time_after: str | None = None, + start_time_before: str | None = None, + close_time_after: str | None = None, + close_time_before: str | None = None, +) -> str: + """Build a visibility query from optional filters.""" + clauses = [] + + if task_queue: + clauses.append(f'TaskQueue="{task_queue}"') + if workflow_type: + clauses.append(f'WorkflowType="{workflow_type}"') + if status: + clauses.append(f'ExecutionStatus="{status}"') + if start_time_after: + clauses.append(f'StartTime>="{start_time_after}"') + if start_time_before: + clauses.append(f'StartTime<="{start_time_before}"') + if close_time_after: + clauses.append(f'CloseTime>="{close_time_after}"') + if close_time_before: + clauses.append(f'CloseTime<="{close_time_before}"') + + return " AND ".join(clauses) if clauses else "" + + +async def find_workflows_with_pending_activities( + client: Client, + query: str, +) -> list[dict]: + """List workflows matching the query, describe each, return those with pending activities.""" + + results = [] + + async for wf in client.list_workflows(query=query or None): + handle = client.get_workflow_handle(wf.id, run_id=wf.run_id) + desc = await handle.describe() + + pending = desc.raw_description.pending_activities + if not pending: + continue + + activities_info = [] + for pa in pending: + activities_info.append( + { + "activity_id": pa.activity_id, + "activity_type": pa.activity_type.name, + "state": str(pa.state), + "attempt": pa.attempt, + } + ) + + parent_exec = desc.raw_description.parent_execution + parent_id = parent_exec.workflow_id if parent_exec else None + + results.append( + { + "workflow_id": wf.id, + "run_id": wf.run_id, + "workflow_type": str(getattr(wf, "workflow_type", "")), + "parent_workflow_id": parent_id, + "pending_activity_count": len(pending), + "pending_activities": activities_info, + } + ) + + return results + + +def save_results(results: list[dict], query: str) -> str: + """Save results to a JSON file in the output/ directory. Returns the file path.""" + output_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), "output") + os.makedirs(output_dir, exist_ok=True) + + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + filepath = os.path.join(output_dir, f"pending_activities_{timestamp}.json") + + with open(filepath, "w") as f: + json.dump( + { + "generated_at": datetime.now().isoformat(), + "query_used": query, + "total_workflows": len(results), + "workflows": results, + }, + f, + indent=2, + ) + + return filepath + + +def print_results(results: list[dict]) -> None: + print("-" * 80) + for entry in results: + print(f"Workflow ID : {entry['workflow_id']}") + print(f"Run ID : {entry['run_id']}") + print(f"Workflow Type : {entry['workflow_type']}") + print(f"Parent WF ID : {entry['parent_workflow_id'] or '(none — top-level)'}") + print(f"Pending Count : {entry['pending_activity_count']}") + for act in entry["pending_activities"]: + print( + f" - Activity ID: {act['activity_id']}, " + f"Type: {act['activity_type']}, " + f"State: {act['state']}, " + f"Attempt: {act['attempt']}" + ) + print("-" * 80) + + +async def main(): + parser = argparse.ArgumentParser( + description="Find workflows with pending activities. All filters are optional." + ) + parser.add_argument("--task-queue", default=None, help="Filter by task queue name") + parser.add_argument("--workflow-type", default=None, help="Filter by workflow type name") + parser.add_argument( + "--status", + default=None, + choices=["Running", "Completed", "Failed", "Canceled", "Terminated", "ContinuedAsNew", "TimedOut"], + help="Filter by execution status (default: all statuses)", + ) + parser.add_argument( + "--start-time-after", + default=None, + help='Workflows started at or after this time (ISO 8601, e.g. "2026-03-01T00:00:00Z")', + ) + parser.add_argument( + "--start-time-before", + default=None, + help='Workflows started at or before this time (ISO 8601, e.g. "2026-03-25T00:00:00Z")', + ) + parser.add_argument( + "--close-time-after", + default=None, + help='Workflows closed at or after this time (ISO 8601, e.g. "2026-03-20T00:00:00Z")', + ) + parser.add_argument( + "--close-time-before", + default=None, + help='Workflows closed at or before this time (ISO 8601, e.g. "2026-03-25T00:00:00Z")', + ) + args = parser.parse_args() + + query = build_query( + task_queue=args.task_queue, + workflow_type=args.workflow_type, + status=args.status, + start_time_after=args.start_time_after, + start_time_before=args.start_time_before, + close_time_after=args.close_time_after, + close_time_before=args.close_time_before, + ) + + print(f"Query: {query or '(no filters — scanning all workflows)'}\n") + + api_key = resolve_api_key() + client = await create_client(api_key=api_key) + results = await find_workflows_with_pending_activities(client, query) + + if not results: + print("No workflows with pending activities found.") + return + + print(f"Found {len(results)} workflow(s) with pending activities:\n") + print_results(results) + + filepath = save_results(results, query) + print(f"\nResults saved to: {filepath}") + + +if __name__ == "__main__": + asyncio.run(main()) From 59e1f87bbaad7e990d68988875801adf9dc5b2b6 Mon Sep 17 00:00:00 2001 From: Deepika Awasthi Date: Wed, 25 Mar 2026 10:24:31 -0700 Subject: [PATCH 2/2] Update find_pending.py with latest changes --- list_pending_activities/find_pending.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/list_pending_activities/find_pending.py b/list_pending_activities/find_pending.py index 1dc9a77f..67b11e18 100644 --- a/list_pending_activities/find_pending.py +++ b/list_pending_activities/find_pending.py @@ -23,10 +23,10 @@ from temporalio.client import Client from temporalio.service import TLSConfig -DEFAULT_NAMESPACE = "deepika-test-namespace.a2dd6" # namespace - . -DEFAULT_API_HOST = "us-east-1.aws.api.temporal.io:7233" # regional endpoint for your namespace -DEFAULT_MTLS_HOST = "deepika-test-namespace.a2dd6.tmprl.cloud:7233" # namespace endpoint for your namespace -DEFAULT_CERTS_DIR = "/Users/deepikaawasthi/temporal/temporal-certs" # certs directory +DEFAULT_NAMESPACE = "." # namespace - . +DEFAULT_API_HOST = "..api.temporal.io:7233" # regional endpoint for your namespace +DEFAULT_MTLS_HOST = "..tmprl.cloud:7233" # namespace endpoint for your namespace +DEFAULT_CERTS_DIR = "directory path containing client.pem and client.key, keep the name as it is" # certs directory def resolve_api_key() -> str | None: