gh-138122: Implement PEP 799 (#138142)

This commit is contained in:
Pablo Galindo Salgado
2025-08-27 17:52:50 +01:00
committed by GitHub
parent f733e428f8
commit 56eb6b64a0
23 changed files with 497 additions and 386 deletions

13
Lib/profiling/__init__.py Normal file
View File

@@ -0,0 +1,13 @@
"""Python profiling tools.
This package provides two types of profilers:
- profiling.tracing: Deterministic tracing profiler that instruments every
function call and return. Higher overhead but provides exact call counts
and timing.
- profiling.sampling: Statistical sampling profiler that periodically samples
the call stack. Low overhead and suitable for production use.
"""
__all__ = ("tracing", "sampling")

View File

@@ -0,0 +1,11 @@
"""Statistical sampling profiler for Python.
This module provides low-overhead profiling by periodically sampling the
call stack rather than tracing every function call.
"""
from .collector import Collector
from .pstats_collector import PstatsCollector
from .stack_collector import CollapsedStackCollector
__all__ = ("Collector", "PstatsCollector", "CollapsedStackCollector")

View File

@@ -0,0 +1,6 @@
"""Run the sampling profiler from the command line."""
from .sample import main
if __name__ == '__main__':
main()

View File

@@ -0,0 +1,243 @@
"""
Internal synchronization coordinator for the sample profiler.
This module is used internally by the sample profiler to coordinate
the startup of target processes. It should not be called directly by users.
"""
import os
import sys
import socket
import runpy
import time
from typing import List, NoReturn
class CoordinatorError(Exception):
"""Base exception for coordinator errors."""
pass
class ArgumentError(CoordinatorError):
"""Raised when invalid arguments are provided."""
pass
class SyncError(CoordinatorError):
"""Raised when synchronization with profiler fails."""
pass
class TargetError(CoordinatorError):
"""Raised when target execution fails."""
pass
def _validate_arguments(args: List[str]) -> tuple[int, str, List[str]]:
"""
Validate and parse command line arguments.
Args:
args: Command line arguments including script name
Returns:
Tuple of (sync_port, working_directory, target_args)
Raises:
ArgumentError: If arguments are invalid
"""
if len(args) < 4:
raise ArgumentError(
"Insufficient arguments. Expected: <sync_port> <cwd> <target> [args...]"
)
try:
sync_port = int(args[1])
if not (1 <= sync_port <= 65535):
raise ValueError("Port out of range")
except ValueError as e:
raise ArgumentError(f"Invalid sync port '{args[1]}': {e}") from e
cwd = args[2]
if not os.path.isdir(cwd):
raise ArgumentError(f"Working directory does not exist: {cwd}")
target_args = args[3:]
if not target_args:
raise ArgumentError("No target specified")
return sync_port, cwd, target_args
# Constants for socket communication
_MAX_RETRIES = 3
_INITIAL_RETRY_DELAY = 0.1
_SOCKET_TIMEOUT = 2.0
_READY_MESSAGE = b"ready"
def _signal_readiness(sync_port: int) -> None:
"""
Signal readiness to the profiler via TCP socket.
Args:
sync_port: Port number where profiler is listening
Raises:
SyncError: If unable to signal readiness
"""
last_error = None
for attempt in range(_MAX_RETRIES):
try:
# Use context manager for automatic cleanup
with socket.create_connection(("127.0.0.1", sync_port), timeout=_SOCKET_TIMEOUT) as sock:
sock.send(_READY_MESSAGE)
return
except (socket.error, OSError) as e:
last_error = e
if attempt < _MAX_RETRIES - 1:
# Exponential backoff before retry
time.sleep(_INITIAL_RETRY_DELAY * (2 ** attempt))
# If we get here, all retries failed
raise SyncError(f"Failed to signal readiness after {_MAX_RETRIES} attempts: {last_error}") from last_error
def _setup_environment(cwd: str) -> None:
"""
Set up the execution environment.
Args:
cwd: Working directory to change to
Raises:
TargetError: If unable to set up environment
"""
try:
os.chdir(cwd)
except OSError as e:
raise TargetError(f"Failed to change to directory {cwd}: {e}") from e
# Add current directory to sys.path if not present (for module imports)
if cwd not in sys.path:
sys.path.insert(0, cwd)
def _execute_module(module_name: str, module_args: List[str]) -> None:
"""
Execute a Python module.
Args:
module_name: Name of the module to execute
module_args: Arguments to pass to the module
Raises:
TargetError: If module execution fails
"""
# Replace sys.argv to match how Python normally runs modules
# When running 'python -m module args', sys.argv is ["__main__.py", "args"]
sys.argv = [f"__main__.py"] + module_args
try:
runpy.run_module(module_name, run_name="__main__", alter_sys=True)
except ImportError as e:
raise TargetError(f"Module '{module_name}' not found: {e}") from e
except SystemExit:
# SystemExit is normal for modules
pass
except Exception as e:
raise TargetError(f"Error executing module '{module_name}': {e}") from e
def _execute_script(script_path: str, script_args: List[str], cwd: str) -> None:
"""
Execute a Python script.
Args:
script_path: Path to the script to execute
script_args: Arguments to pass to the script
cwd: Current working directory for path resolution
Raises:
TargetError: If script execution fails
"""
# Make script path absolute if it isn't already
if not os.path.isabs(script_path):
script_path = os.path.join(cwd, script_path)
if not os.path.isfile(script_path):
raise TargetError(f"Script not found: {script_path}")
# Replace sys.argv to match original script call
sys.argv = [script_path] + script_args
try:
with open(script_path, 'rb') as f:
source_code = f.read()
# Compile and execute the script
code = compile(source_code, script_path, 'exec')
exec(code, {'__name__': '__main__', '__file__': script_path})
except FileNotFoundError as e:
raise TargetError(f"Script file not found: {script_path}") from e
except PermissionError as e:
raise TargetError(f"Permission denied reading script: {script_path}") from e
except SyntaxError as e:
raise TargetError(f"Syntax error in script {script_path}: {e}") from e
except SystemExit:
# SystemExit is normal for scripts
pass
except Exception as e:
raise TargetError(f"Error executing script '{script_path}': {e}") from e
def main() -> NoReturn:
"""
Main coordinator function.
This function coordinates the startup of a target Python process
with the sample profiler by signaling when the process is ready
to be profiled.
"""
try:
# Parse and validate arguments
sync_port, cwd, target_args = _validate_arguments(sys.argv)
# Set up execution environment
_setup_environment(cwd)
# Signal readiness to profiler
_signal_readiness(sync_port)
# Execute the target
if target_args[0] == "-m":
# Module execution
if len(target_args) < 2:
raise ArgumentError("Module name required after -m")
module_name = target_args[1]
module_args = target_args[2:]
_execute_module(module_name, module_args)
else:
# Script execution
script_path = target_args[0]
script_args = target_args[1:]
_execute_script(script_path, script_args, cwd)
except CoordinatorError as e:
print(f"Profiler coordinator error: {e}", file=sys.stderr)
sys.exit(1)
except KeyboardInterrupt:
print("Interrupted", file=sys.stderr)
sys.exit(1)
except Exception as e:
print(f"Unexpected error in profiler coordinator: {e}", file=sys.stderr)
sys.exit(1)
# Normal exit
sys.exit(0)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,11 @@
from abc import ABC, abstractmethod
class Collector(ABC):
@abstractmethod
def collect(self, stack_frames):
"""Collect profiling data from stack frames."""
@abstractmethod
def export(self, filename):
"""Export collected data to a file."""

View File

@@ -0,0 +1,81 @@
import collections
import marshal
from .collector import Collector
class PstatsCollector(Collector):
def __init__(self, sample_interval_usec):
self.result = collections.defaultdict(
lambda: dict(total_rec_calls=0, direct_calls=0, cumulative_calls=0)
)
self.stats = {}
self.sample_interval_usec = sample_interval_usec
self.callers = collections.defaultdict(
lambda: collections.defaultdict(int)
)
def collect(self, stack_frames):
for thread_id, frames in stack_frames:
if not frames:
continue
# Process each frame in the stack to track cumulative calls
for frame in frames:
location = (frame.filename, frame.lineno, frame.funcname)
self.result[location]["cumulative_calls"] += 1
# The top frame gets counted as an inline call (directly executing)
top_frame = frames[0]
top_location = (
top_frame.filename,
top_frame.lineno,
top_frame.funcname,
)
self.result[top_location]["direct_calls"] += 1
# Track caller-callee relationships for call graph
for i in range(1, len(frames)):
callee_frame = frames[i - 1]
caller_frame = frames[i]
callee = (
callee_frame.filename,
callee_frame.lineno,
callee_frame.funcname,
)
caller = (
caller_frame.filename,
caller_frame.lineno,
caller_frame.funcname,
)
self.callers[callee][caller] += 1
def export(self, filename):
self.create_stats()
self._dump_stats(filename)
def _dump_stats(self, file):
stats_with_marker = dict(self.stats)
stats_with_marker[("__sampled__",)] = True
with open(file, "wb") as f:
marshal.dump(stats_with_marker, f)
# Needed for compatibility with pstats.Stats
def create_stats(self):
sample_interval_sec = self.sample_interval_usec / 1_000_000
callers = {}
for fname, call_counts in self.result.items():
total = call_counts["direct_calls"] * sample_interval_sec
cumulative_calls = call_counts["cumulative_calls"]
cumulative = cumulative_calls * sample_interval_sec
callers = dict(self.callers.get(fname, {}))
self.stats[fname] = (
call_counts["direct_calls"], # cc = direct calls for sample percentage
cumulative_calls, # nc = cumulative calls for cumulative percentage
total,
cumulative,
callers,
)

View File

@@ -0,0 +1,864 @@
import argparse
import _remote_debugging
import os
import pstats
import socket
import subprocess
import statistics
import sys
import sysconfig
import time
from collections import deque
from _colorize import ANSIColors
from .pstats_collector import PstatsCollector
from .stack_collector import CollapsedStackCollector
_FREE_THREADED_BUILD = sysconfig.get_config_var("Py_GIL_DISABLED") is not None
_MAX_STARTUP_ATTEMPTS = 5
_STARTUP_RETRY_DELAY_SECONDS = 0.1
_HELP_DESCRIPTION = """Sample a process's stack frames and generate profiling data.
Supports the following target modes:
- -p PID: Profile an existing process by PID
- -m MODULE [ARGS...]: Profile a module as python -m module ...
- filename [ARGS...]: Profile the specified script by running it in a subprocess
Examples:
# Profile process 1234 for 10 seconds with default settings
python -m profiling.sampling -p 1234
# Profile a script by running it in a subprocess
python -m profiling.sampling myscript.py arg1 arg2
# Profile a module by running it as python -m module in a subprocess
python -m profiling.sampling -m mymodule arg1 arg2
# Profile with custom interval and duration, save to file
python -m profiling.sampling -i 50 -d 30 -o profile.stats -p 1234
# Generate collapsed stacks for flamegraph
python -m profiling.sampling --collapsed -p 1234
# Profile all threads, sort by total time
python -m profiling.sampling -a --sort-tottime -p 1234
# Profile for 1 minute with 1ms sampling interval
python -m profiling.sampling -i 1000 -d 60 -p 1234
# Show only top 20 functions sorted by direct samples
python -m profiling.sampling --sort-nsamples -l 20 -p 1234
# Profile all threads and save collapsed stacks
python -m profiling.sampling -a --collapsed -o stacks.txt -p 1234
# Profile with real-time sampling statistics
python -m profiling.sampling --realtime-stats -p 1234
# Sort by sample percentage to find most sampled functions
python -m profiling.sampling --sort-sample-pct -p 1234
# Sort by cumulative samples to find functions most on call stack
python -m profiling.sampling --sort-nsamples-cumul -p 1234"""
# Constants for socket synchronization
_SYNC_TIMEOUT = 5.0
_PROCESS_KILL_TIMEOUT = 2.0
_READY_MESSAGE = b"ready"
_RECV_BUFFER_SIZE = 1024
def _run_with_sync(original_cmd):
"""Run a command with socket-based synchronization and return the process."""
# Create a TCP socket for synchronization with better socket options
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sync_sock:
# Set SO_REUSEADDR to avoid "Address already in use" errors
sync_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sync_sock.bind(("127.0.0.1", 0)) # Let OS choose a free port
sync_port = sync_sock.getsockname()[1]
sync_sock.listen(1)
sync_sock.settimeout(_SYNC_TIMEOUT)
# Get current working directory to preserve it
cwd = os.getcwd()
# Build command using the sync coordinator
target_args = original_cmd[1:] # Remove python executable
cmd = (sys.executable, "-m", "profiling.sampling._sync_coordinator", str(sync_port), cwd) + tuple(target_args)
# Start the process with coordinator
process = subprocess.Popen(cmd)
try:
# Wait for ready signal with timeout
with sync_sock.accept()[0] as conn:
ready_signal = conn.recv(_RECV_BUFFER_SIZE)
if ready_signal != _READY_MESSAGE:
raise RuntimeError(f"Invalid ready signal received: {ready_signal!r}")
except socket.timeout:
# If we timeout, kill the process and raise an error
if process.poll() is None:
process.terminate()
try:
process.wait(timeout=_PROCESS_KILL_TIMEOUT)
except subprocess.TimeoutExpired:
process.kill()
process.wait()
raise RuntimeError("Process failed to signal readiness within timeout")
return process
class SampleProfiler:
def __init__(self, pid, sample_interval_usec, all_threads):
self.pid = pid
self.sample_interval_usec = sample_interval_usec
self.all_threads = all_threads
if _FREE_THREADED_BUILD:
self.unwinder = _remote_debugging.RemoteUnwinder(
self.pid, all_threads=self.all_threads
)
else:
only_active_threads = bool(self.all_threads)
self.unwinder = _remote_debugging.RemoteUnwinder(
self.pid, only_active_thread=only_active_threads
)
# Track sample intervals and total sample count
self.sample_intervals = deque(maxlen=100)
self.total_samples = 0
self.realtime_stats = False
def sample(self, collector, duration_sec=10):
sample_interval_sec = self.sample_interval_usec / 1_000_000
running_time = 0
num_samples = 0
errors = 0
start_time = next_time = time.perf_counter()
last_sample_time = start_time
realtime_update_interval = 1.0 # Update every second
last_realtime_update = start_time
while running_time < duration_sec:
current_time = time.perf_counter()
if next_time < current_time:
try:
stack_frames = self.unwinder.get_stack_trace()
collector.collect(stack_frames)
except ProcessLookupError:
duration_sec = current_time - start_time
break
except (RuntimeError, UnicodeDecodeError, MemoryError, OSError):
errors += 1
except Exception as e:
if not self._is_process_running():
break
raise e from None
# Track actual sampling intervals for real-time stats
if num_samples > 0:
actual_interval = current_time - last_sample_time
self.sample_intervals.append(
1.0 / actual_interval
) # Convert to Hz
self.total_samples += 1
# Print real-time statistics if enabled
if (
self.realtime_stats
and (current_time - last_realtime_update)
>= realtime_update_interval
):
self._print_realtime_stats()
last_realtime_update = current_time
last_sample_time = current_time
num_samples += 1
next_time += sample_interval_sec
running_time = time.perf_counter() - start_time
# Clear real-time stats line if it was being displayed
if self.realtime_stats and len(self.sample_intervals) > 0:
print() # Add newline after real-time stats
print(f"Captured {num_samples} samples in {running_time:.2f} seconds")
print(f"Sample rate: {num_samples / running_time:.2f} samples/sec")
print(f"Error rate: {(errors / num_samples) * 100:.2f}%")
expected_samples = int(duration_sec / sample_interval_sec)
if num_samples < expected_samples:
print(
f"Warning: missed {expected_samples - num_samples} samples "
f"from the expected total of {expected_samples} "
f"({(expected_samples - num_samples) / expected_samples * 100:.2f}%)"
)
def _is_process_running(self):
if sys.platform == "linux" or sys.platform == "darwin":
try:
os.kill(self.pid, 0)
return True
except ProcessLookupError:
return False
elif sys.platform == "win32":
try:
_remote_debugging.RemoteUnwinder(self.pid)
except Exception:
return False
return True
else:
raise ValueError(f"Unsupported platform: {sys.platform}")
def _print_realtime_stats(self):
"""Print real-time sampling statistics."""
if len(self.sample_intervals) < 2:
return
# Calculate statistics on the Hz values (deque automatically maintains rolling window)
hz_values = list(self.sample_intervals)
mean_hz = statistics.mean(hz_values)
min_hz = min(hz_values)
max_hz = max(hz_values)
# Calculate microseconds per sample for all metrics (1/Hz * 1,000,000)
mean_us_per_sample = (1.0 / mean_hz) * 1_000_000 if mean_hz > 0 else 0
min_us_per_sample = (
(1.0 / max_hz) * 1_000_000 if max_hz > 0 else 0
) # Min time = Max Hz
max_us_per_sample = (
(1.0 / min_hz) * 1_000_000 if min_hz > 0 else 0
) # Max time = Min Hz
# Clear line and print stats
print(
f"\r\033[K{ANSIColors.BOLD_BLUE}Real-time sampling stats:{ANSIColors.RESET} "
f"{ANSIColors.YELLOW}Mean: {mean_hz:.1f}Hz ({mean_us_per_sample:.2f}µs){ANSIColors.RESET} "
f"{ANSIColors.GREEN}Min: {min_hz:.1f}Hz ({max_us_per_sample:.2f}µs){ANSIColors.RESET} "
f"{ANSIColors.RED}Max: {max_hz:.1f}Hz ({min_us_per_sample:.2f}µs){ANSIColors.RESET} "
f"{ANSIColors.CYAN}Samples: {self.total_samples}{ANSIColors.RESET}",
end="",
flush=True,
)
def _determine_best_unit(max_value):
"""Determine the best unit (s, ms, μs) and scale factor for a maximum value."""
if max_value >= 1.0:
return "s", 1.0
elif max_value >= 0.001:
return "ms", 1000.0
else:
return "μs", 1000000.0
def print_sampled_stats(
stats, sort=-1, limit=None, show_summary=True, sample_interval_usec=100
):
# Get the stats data
stats_list = []
for func, (
direct_calls,
cumulative_calls,
total_time,
cumulative_time,
callers,
) in stats.stats.items():
stats_list.append(
(
func,
direct_calls,
cumulative_calls,
total_time,
cumulative_time,
callers,
)
)
# Calculate total samples for percentage calculations (using direct_calls)
total_samples = sum(
direct_calls for _, direct_calls, _, _, _, _ in stats_list
)
# Sort based on the requested field
sort_field = sort
if sort_field == -1: # stdname
stats_list.sort(key=lambda x: str(x[0]))
elif sort_field == 0: # nsamples (direct samples)
stats_list.sort(key=lambda x: x[1], reverse=True) # direct_calls
elif sort_field == 1: # tottime
stats_list.sort(key=lambda x: x[3], reverse=True) # total_time
elif sort_field == 2: # cumtime
stats_list.sort(key=lambda x: x[4], reverse=True) # cumulative_time
elif sort_field == 3: # sample%
stats_list.sort(
key=lambda x: (x[1] / total_samples * 100)
if total_samples > 0
else 0,
reverse=True, # direct_calls percentage
)
elif sort_field == 4: # cumul%
stats_list.sort(
key=lambda x: (x[2] / total_samples * 100)
if total_samples > 0
else 0,
reverse=True, # cumulative_calls percentage
)
elif sort_field == 5: # nsamples (cumulative samples)
stats_list.sort(key=lambda x: x[2], reverse=True) # cumulative_calls
# Apply limit if specified
if limit is not None:
stats_list = stats_list[:limit]
# Determine the best unit for time columns based on maximum values
max_total_time = max(
(total_time for _, _, _, total_time, _, _ in stats_list), default=0
)
max_cumulative_time = max(
(cumulative_time for _, _, _, _, cumulative_time, _ in stats_list),
default=0,
)
total_time_unit, total_time_scale = _determine_best_unit(max_total_time)
cumulative_time_unit, cumulative_time_scale = _determine_best_unit(
max_cumulative_time
)
# Define column widths for consistent alignment
col_widths = {
"nsamples": 15, # "nsamples" column (inline/cumulative format)
"sample_pct": 8, # "sample%" column
"tottime": max(12, len(f"tottime ({total_time_unit})")),
"cum_pct": 8, # "cumul%" column
"cumtime": max(12, len(f"cumtime ({cumulative_time_unit})")),
}
# Print header with colors and proper alignment
print(f"{ANSIColors.BOLD_BLUE}Profile Stats:{ANSIColors.RESET}")
header_nsamples = f"{ANSIColors.BOLD_BLUE}{'nsamples':>{col_widths['nsamples']}}{ANSIColors.RESET}"
header_sample_pct = f"{ANSIColors.BOLD_BLUE}{'sample%':>{col_widths['sample_pct']}}{ANSIColors.RESET}"
header_tottime = f"{ANSIColors.BOLD_BLUE}{f'tottime ({total_time_unit})':>{col_widths['tottime']}}{ANSIColors.RESET}"
header_cum_pct = f"{ANSIColors.BOLD_BLUE}{'cumul%':>{col_widths['cum_pct']}}{ANSIColors.RESET}"
header_cumtime = f"{ANSIColors.BOLD_BLUE}{f'cumtime ({cumulative_time_unit})':>{col_widths['cumtime']}}{ANSIColors.RESET}"
header_filename = (
f"{ANSIColors.BOLD_BLUE}filename:lineno(function){ANSIColors.RESET}"
)
print(
f"{header_nsamples} {header_sample_pct} {header_tottime} {header_cum_pct} {header_cumtime} {header_filename}"
)
# Print each line with proper alignment
for (
func,
direct_calls,
cumulative_calls,
total_time,
cumulative_time,
callers,
) in stats_list:
# Calculate percentages
sample_pct = (
(direct_calls / total_samples * 100) if total_samples > 0 else 0
)
cum_pct = (
(cumulative_calls / total_samples * 100)
if total_samples > 0
else 0
)
# Format values with proper alignment - always use A/B format
nsamples_str = f"{direct_calls}/{cumulative_calls}"
nsamples_str = f"{nsamples_str:>{col_widths['nsamples']}}"
sample_pct_str = f"{sample_pct:{col_widths['sample_pct']}.1f}"
tottime = f"{total_time * total_time_scale:{col_widths['tottime']}.3f}"
cum_pct_str = f"{cum_pct:{col_widths['cum_pct']}.1f}"
cumtime = f"{cumulative_time * cumulative_time_scale:{col_widths['cumtime']}.3f}"
# Format the function name with colors
func_name = (
f"{ANSIColors.GREEN}{func[0]}{ANSIColors.RESET}:"
f"{ANSIColors.YELLOW}{func[1]}{ANSIColors.RESET}("
f"{ANSIColors.CYAN}{func[2]}{ANSIColors.RESET})"
)
# Print the formatted line with consistent spacing
print(
f"{nsamples_str} {sample_pct_str} {tottime} {cum_pct_str} {cumtime} {func_name}"
)
# Print legend
print(f"\n{ANSIColors.BOLD_BLUE}Legend:{ANSIColors.RESET}")
print(
f" {ANSIColors.YELLOW}nsamples{ANSIColors.RESET}: Direct/Cumulative samples (direct executing / on call stack)"
)
print(
f" {ANSIColors.YELLOW}sample%{ANSIColors.RESET}: Percentage of total samples this function was directly executing"
)
print(
f" {ANSIColors.YELLOW}tottime{ANSIColors.RESET}: Estimated total time spent directly in this function"
)
print(
f" {ANSIColors.YELLOW}cumul%{ANSIColors.RESET}: Percentage of total samples when this function was on the call stack"
)
print(
f" {ANSIColors.YELLOW}cumtime{ANSIColors.RESET}: Estimated cumulative time (including time in called functions)"
)
print(
f" {ANSIColors.YELLOW}filename:lineno(function){ANSIColors.RESET}: Function location and name"
)
def _format_func_name(func):
"""Format function name with colors."""
return (
f"{ANSIColors.GREEN}{func[0]}{ANSIColors.RESET}:"
f"{ANSIColors.YELLOW}{func[1]}{ANSIColors.RESET}("
f"{ANSIColors.CYAN}{func[2]}{ANSIColors.RESET})"
)
def _print_top_functions(stats_list, title, key_func, format_line, n=3):
"""Print top N functions sorted by key_func with formatted output."""
print(f"\n{ANSIColors.BOLD_BLUE}{title}:{ANSIColors.RESET}")
sorted_stats = sorted(stats_list, key=key_func, reverse=True)
for stat in sorted_stats[:n]:
if line := format_line(stat):
print(f" {line}")
# Print summary of interesting functions if enabled
if show_summary and stats_list:
print(
f"\n{ANSIColors.BOLD_BLUE}Summary of Interesting Functions:{ANSIColors.RESET}"
)
# Aggregate stats by fully qualified function name (ignoring line numbers)
func_aggregated = {}
for (
func,
direct_calls,
cumulative_calls,
total_time,
cumulative_time,
callers,
) in stats_list:
# Use filename:function_name as the key to get fully qualified name
qualified_name = f"{func[0]}:{func[2]}"
if qualified_name not in func_aggregated:
func_aggregated[qualified_name] = [
0,
0,
0,
0,
] # direct_calls, cumulative_calls, total_time, cumulative_time
func_aggregated[qualified_name][0] += direct_calls
func_aggregated[qualified_name][1] += cumulative_calls
func_aggregated[qualified_name][2] += total_time
func_aggregated[qualified_name][3] += cumulative_time
# Convert aggregated data back to list format for processing
aggregated_stats = []
for qualified_name, (
prim_calls,
total_calls,
total_time,
cumulative_time,
) in func_aggregated.items():
# Parse the qualified name back to filename and function name
if ":" in qualified_name:
filename, func_name = qualified_name.rsplit(":", 1)
else:
filename, func_name = "", qualified_name
# Create a dummy func tuple with filename and function name for display
dummy_func = (filename, "", func_name)
aggregated_stats.append(
(
dummy_func,
prim_calls,
total_calls,
total_time,
cumulative_time,
{},
)
)
# Determine best units for summary metrics
max_total_time = max(
(total_time for _, _, _, total_time, _, _ in aggregated_stats),
default=0,
)
max_cumulative_time = max(
(
cumulative_time
for _, _, _, _, cumulative_time, _ in aggregated_stats
),
default=0,
)
total_unit, total_scale = _determine_best_unit(max_total_time)
cumulative_unit, cumulative_scale = _determine_best_unit(
max_cumulative_time
)
# Functions with highest direct/cumulative ratio (hot spots)
def format_hotspots(stat):
func, direct_calls, cumulative_calls, total_time, _, _ = stat
if direct_calls > 0 and cumulative_calls > 0:
ratio = direct_calls / cumulative_calls
direct_pct = (
(direct_calls / total_samples * 100)
if total_samples > 0
else 0
)
return (
f"{ratio:.3f} direct/cumulative ratio, "
f"{direct_pct:.1f}% direct samples: {_format_func_name(func)}"
)
return None
_print_top_functions(
aggregated_stats,
"Functions with Highest Direct/Cumulative Ratio (Hot Spots)",
key_func=lambda x: (x[1] / x[2]) if x[2] > 0 else 0,
format_line=format_hotspots,
)
# Functions with highest call frequency (cumulative/direct difference)
def format_call_frequency(stat):
func, direct_calls, cumulative_calls, total_time, _, _ = stat
if cumulative_calls > direct_calls:
call_frequency = cumulative_calls - direct_calls
cum_pct = (
(cumulative_calls / total_samples * 100)
if total_samples > 0
else 0
)
return (
f"{call_frequency:d} indirect calls, "
f"{cum_pct:.1f}% total stack presence: {_format_func_name(func)}"
)
return None
_print_top_functions(
aggregated_stats,
"Functions with Highest Call Frequency (Indirect Calls)",
key_func=lambda x: x[2] - x[1], # Sort by (cumulative - direct)
format_line=format_call_frequency,
)
# Functions with highest cumulative-to-direct multiplier (call magnification)
def format_call_magnification(stat):
func, direct_calls, cumulative_calls, total_time, _, _ = stat
if direct_calls > 0 and cumulative_calls > direct_calls:
multiplier = cumulative_calls / direct_calls
indirect_calls = cumulative_calls - direct_calls
return (
f"{multiplier:.1f}x call magnification, "
f"{indirect_calls:d} indirect calls from {direct_calls:d} direct: {_format_func_name(func)}"
)
return None
_print_top_functions(
aggregated_stats,
"Functions with Highest Call Magnification (Cumulative/Direct)",
key_func=lambda x: (x[2] / x[1])
if x[1] > 0
else 0, # Sort by cumulative/direct ratio
format_line=format_call_magnification,
)
def sample(
pid,
*,
sort=2,
sample_interval_usec=100,
duration_sec=10,
filename=None,
all_threads=False,
limit=None,
show_summary=True,
output_format="pstats",
realtime_stats=False,
):
profiler = SampleProfiler(
pid, sample_interval_usec, all_threads=all_threads
)
profiler.realtime_stats = realtime_stats
collector = None
match output_format:
case "pstats":
collector = PstatsCollector(sample_interval_usec)
case "collapsed":
collector = CollapsedStackCollector()
filename = filename or f"collapsed.{pid}.txt"
case _:
raise ValueError(f"Invalid output format: {output_format}")
profiler.sample(collector, duration_sec)
if output_format == "pstats" and not filename:
stats = pstats.SampledStats(collector).strip_dirs()
print_sampled_stats(
stats, sort, limit, show_summary, sample_interval_usec
)
else:
collector.export(filename)
def _validate_collapsed_format_args(args, parser):
# Check for incompatible pstats options
invalid_opts = []
# Get list of pstats-specific options
pstats_options = {"sort": None, "limit": None, "no_summary": False}
# Find the default values from the argument definitions
for action in parser._actions:
if action.dest in pstats_options and hasattr(action, "default"):
pstats_options[action.dest] = action.default
# Check if any pstats-specific options were provided by comparing with defaults
for opt, default in pstats_options.items():
if getattr(args, opt) != default:
invalid_opts.append(opt.replace("no_", ""))
if invalid_opts:
parser.error(
f"The following options are only valid with --pstats format: {', '.join(invalid_opts)}"
)
# Set default output filename for collapsed format only if we have a PID
# For module/script execution, this will be set later with the subprocess PID
if not args.outfile and args.pid is not None:
args.outfile = f"collapsed.{args.pid}.txt"
def wait_for_process_and_sample(pid, sort_value, args):
"""Sample the process immediately since it has already signaled readiness."""
# Set default collapsed filename with subprocess PID if not already set
filename = args.outfile
if not filename and args.format == "collapsed":
filename = f"collapsed.{pid}.txt"
sample(
pid,
sort=sort_value,
sample_interval_usec=args.interval,
duration_sec=args.duration,
filename=filename,
all_threads=args.all_threads,
limit=args.limit,
show_summary=not args.no_summary,
output_format=args.format,
realtime_stats=args.realtime_stats,
)
def main():
# Create the main parser
parser = argparse.ArgumentParser(
description=_HELP_DESCRIPTION,
formatter_class=argparse.RawDescriptionHelpFormatter,
)
# Target selection
target_group = parser.add_mutually_exclusive_group(required=False)
target_group.add_argument(
"-p", "--pid", type=int, help="Process ID to sample"
)
target_group.add_argument(
"-m", "--module",
help="Run and profile a module as python -m module [ARGS...]"
)
parser.add_argument(
"args",
nargs=argparse.REMAINDER,
help="Script to run and profile, with optional arguments"
)
# Sampling options
sampling_group = parser.add_argument_group("Sampling configuration")
sampling_group.add_argument(
"-i",
"--interval",
type=int,
default=100,
help="Sampling interval in microseconds (default: 100)",
)
sampling_group.add_argument(
"-d",
"--duration",
type=int,
default=10,
help="Sampling duration in seconds (default: 10)",
)
sampling_group.add_argument(
"-a",
"--all-threads",
action="store_true",
help="Sample all threads in the process instead of just the main thread",
)
sampling_group.add_argument(
"--realtime-stats",
action="store_true",
default=False,
help="Print real-time sampling statistics (Hz, mean, min, max, stdev) during profiling",
)
# Output format selection
output_group = parser.add_argument_group("Output options")
output_format = output_group.add_mutually_exclusive_group()
output_format.add_argument(
"--pstats",
action="store_const",
const="pstats",
dest="format",
default="pstats",
help="Generate pstats output (default)",
)
output_format.add_argument(
"--collapsed",
action="store_const",
const="collapsed",
dest="format",
help="Generate collapsed stack traces for flamegraphs",
)
output_group.add_argument(
"-o",
"--outfile",
help="Save output to a file (if omitted, prints to stdout for pstats, "
"or saves to collapsed.<pid>.txt for collapsed format)",
)
# pstats-specific options
pstats_group = parser.add_argument_group("pstats format options")
sort_group = pstats_group.add_mutually_exclusive_group()
sort_group.add_argument(
"--sort-nsamples",
action="store_const",
const=0,
dest="sort",
help="Sort by number of direct samples (nsamples column)",
)
sort_group.add_argument(
"--sort-tottime",
action="store_const",
const=1,
dest="sort",
help="Sort by total time (tottime column)",
)
sort_group.add_argument(
"--sort-cumtime",
action="store_const",
const=2,
dest="sort",
help="Sort by cumulative time (cumtime column, default)",
)
sort_group.add_argument(
"--sort-sample-pct",
action="store_const",
const=3,
dest="sort",
help="Sort by sample percentage (sample%% column)",
)
sort_group.add_argument(
"--sort-cumul-pct",
action="store_const",
const=4,
dest="sort",
help="Sort by cumulative sample percentage (cumul%% column)",
)
sort_group.add_argument(
"--sort-nsamples-cumul",
action="store_const",
const=5,
dest="sort",
help="Sort by cumulative samples (nsamples column, cumulative part)",
)
sort_group.add_argument(
"--sort-name",
action="store_const",
const=-1,
dest="sort",
help="Sort by function name",
)
pstats_group.add_argument(
"-l",
"--limit",
type=int,
help="Limit the number of rows in the output",
default=15,
)
pstats_group.add_argument(
"--no-summary",
action="store_true",
help="Disable the summary section in the output",
)
args = parser.parse_args()
# Validate format-specific arguments
if args.format == "collapsed":
_validate_collapsed_format_args(args, parser)
sort_value = args.sort if args.sort is not None else 2
if args.module is not None and not args.module:
parser.error("argument -m/--module: expected one argument")
# Validate that we have exactly one target type
# Note: args can be present with -m (module arguments) but not as standalone script
has_pid = args.pid is not None
has_module = args.module is not None
has_script = bool(args.args) and args.module is None
target_count = sum([has_pid, has_module, has_script])
if target_count == 0:
parser.error("one of the arguments -p/--pid -m/--module or script name is required")
elif target_count > 1:
parser.error("only one target type can be specified: -p/--pid, -m/--module, or script")
if args.pid:
sample(
args.pid,
sample_interval_usec=args.interval,
duration_sec=args.duration,
filename=args.outfile,
all_threads=args.all_threads,
limit=args.limit,
sort=sort_value,
show_summary=not args.no_summary,
output_format=args.format,
realtime_stats=args.realtime_stats,
)
elif args.module or args.args:
if args.module:
cmd = (sys.executable, "-m", args.module, *args.args)
else:
cmd = (sys.executable, *args.args)
# Use synchronized process startup
process = _run_with_sync(cmd)
# Process has already signaled readiness, start sampling immediately
try:
wait_for_process_and_sample(process.pid, sort_value, args)
finally:
if process.poll() is None:
process.terminate()
try:
process.wait(timeout=2)
except subprocess.TimeoutExpired:
process.kill()
process.wait()
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,37 @@
import collections
import os
from .collector import Collector
class StackTraceCollector(Collector):
def __init__(self):
self.call_trees = []
self.function_samples = collections.defaultdict(int)
def collect(self, stack_frames):
for thread_id, frames in stack_frames:
if frames:
# Store the complete call stack (reverse order - root first)
call_tree = list(reversed(frames))
self.call_trees.append(call_tree)
# Count samples per function
for frame in frames:
self.function_samples[frame] += 1
class CollapsedStackCollector(StackTraceCollector):
def export(self, filename):
stack_counter = collections.Counter()
for call_tree in self.call_trees:
# Call tree is already in root->leaf order
stack_str = ";".join(
f"{os.path.basename(f[0])}:{f[2]}:{f[1]}" for f in call_tree
)
stack_counter[stack_str] += 1
with open(filename, "w") as f:
for stack, count in stack_counter.items():
f.write(f"{stack} {count}\n")
print(f"Collapsed stack output written to {filename}")

View File

@@ -0,0 +1,219 @@
"""Tracing profiler for Python.
This module provides deterministic profiling of Python programs by tracing
every function call and return.
"""
__all__ = ("run", "runctx", "Profile")
import _lsprof
import importlib.machinery
import importlib.util
import io
from profiling.tracing._utils import _Utils
# ____________________________________________________________
# Simple interface
def run(statement, filename=None, sort=-1):
"""Run statement under profiler optionally saving results in filename
This function takes a single argument that can be passed to the
"exec" statement, and an optional file name. In all cases this
routine attempts to "exec" its first argument and gather profiling
statistics from the execution. If no file name is present, then this
function automatically prints a simple profiling report, sorted by the
standard name string (file/line/function-name) that is presented in
each line.
"""
return _Utils(Profile).run(statement, filename, sort)
def runctx(statement, globals, locals, filename=None, sort=-1):
"""Run statement under profiler, supplying your own globals and locals,
optionally saving results in filename.
statement and filename have the same semantics as profile.run
"""
return _Utils(Profile).runctx(statement, globals, locals,
filename, sort)
# ____________________________________________________________
class Profile(_lsprof.Profiler):
"""Profile(timer=None, timeunit=None, subcalls=True, builtins=True)
Builds a profiler object using the specified timer function.
The default timer is a fast built-in one based on real time.
For custom timer functions returning integers, timeunit can
be a float specifying a scale (i.e. how long each integer unit
is, in seconds).
"""
# Most of the functionality is in the base class.
# This subclass only adds convenient and backward-compatible methods.
def print_stats(self, sort=-1):
import pstats
if not isinstance(sort, tuple):
sort = (sort,)
pstats.Stats(self).strip_dirs().sort_stats(*sort).print_stats()
def dump_stats(self, file):
import marshal
with open(file, 'wb') as f:
self.create_stats()
marshal.dump(self.stats, f)
def create_stats(self):
self.disable()
self.snapshot_stats()
def snapshot_stats(self):
entries = self.getstats()
self.stats = {}
callersdicts = {}
# call information
for entry in entries:
func = label(entry.code)
nc = entry.callcount # ncalls column of pstats (before '/')
cc = nc - entry.reccallcount # ncalls column of pstats (after '/')
tt = entry.inlinetime # tottime column of pstats
ct = entry.totaltime # cumtime column of pstats
callers = {}
callersdicts[id(entry.code)] = callers
self.stats[func] = cc, nc, tt, ct, callers
# subcall information
for entry in entries:
if entry.calls:
func = label(entry.code)
for subentry in entry.calls:
try:
callers = callersdicts[id(subentry.code)]
except KeyError:
continue
nc = subentry.callcount
cc = nc - subentry.reccallcount
tt = subentry.inlinetime
ct = subentry.totaltime
if func in callers:
prev = callers[func]
nc += prev[0]
cc += prev[1]
tt += prev[2]
ct += prev[3]
callers[func] = nc, cc, tt, ct
# The following two methods can be called by clients to use
# a profiler to profile a statement, given as a string.
def run(self, cmd):
import __main__
dict = __main__.__dict__
return self.runctx(cmd, dict, dict)
def runctx(self, cmd, globals, locals):
self.enable()
try:
exec(cmd, globals, locals)
finally:
self.disable()
return self
# This method is more useful to profile a single function call.
def runcall(self, func, /, *args, **kw):
self.enable()
try:
return func(*args, **kw)
finally:
self.disable()
def __enter__(self):
self.enable()
return self
def __exit__(self, *exc_info):
self.disable()
# ____________________________________________________________
def label(code):
if isinstance(code, str):
return ('~', 0, code) # built-in functions ('~' sorts at the end)
else:
return (code.co_filename, code.co_firstlineno, code.co_name)
# ____________________________________________________________
def main():
import os
import sys
import runpy
import pstats
from optparse import OptionParser
usage = "cProfile.py [-o output_file_path] [-s sort] [-m module | scriptfile] [arg] ..."
parser = OptionParser(usage=usage)
parser.allow_interspersed_args = False
parser.add_option('-o', '--outfile', dest="outfile",
help="Save stats to <outfile>", default=None)
parser.add_option('-s', '--sort', dest="sort",
help="Sort order when printing to stdout, based on pstats.Stats class",
default=2,
choices=sorted(pstats.Stats.sort_arg_dict_default))
parser.add_option('-m', dest="module", action="store_true",
help="Profile a library module", default=False)
if not sys.argv[1:]:
parser.print_usage()
sys.exit(2)
(options, args) = parser.parse_args()
sys.argv[:] = args
# The script that we're profiling may chdir, so capture the absolute path
# to the output file at startup.
if options.outfile is not None:
options.outfile = os.path.abspath(options.outfile)
if len(args) > 0:
if options.module:
code = "run_module(modname, run_name='__main__')"
globs = {
'run_module': runpy.run_module,
'modname': args[0]
}
else:
progname = args[0]
sys.path.insert(0, os.path.dirname(progname))
with io.open_code(progname) as fp:
code = compile(fp.read(), progname, 'exec')
spec = importlib.machinery.ModuleSpec(name='__main__', loader=None,
origin=progname)
module = importlib.util.module_from_spec(spec)
# Set __main__ so that importing __main__ in the profiled code will
# return the same namespace that the code is executing under.
sys.modules['__main__'] = module
# Ensure that we're using the same __dict__ instance as the module
# for the global variables so that updates to globals are reflected
# in the module's namespace.
globs = module.__dict__
globs.update({
'__spec__': spec,
'__file__': spec.origin,
'__name__': spec.name,
'__package__': None,
'__cached__': None,
})
try:
runctx(code, globs, None, options.outfile, options.sort)
except BrokenPipeError as exc:
# Prevent "Exception ignored" during interpreter shutdown.
sys.stdout = None
sys.exit(exc.errno)
else:
parser.print_usage()
return parser
# When invoked as main program, invoke the profiler on a script
if __name__ == '__main__':
main()

View File

@@ -0,0 +1,6 @@
"""Run the tracing profiler from the command line."""
from profiling.tracing import main
if __name__ == '__main__':
main()

View File

@@ -0,0 +1,32 @@
class _Utils:
"""Support class for utility functions which are shared by
profile.py and cProfile.py modules.
Not supposed to be used directly.
"""
def __init__(self, profiler):
self.profiler = profiler
def run(self, statement, filename, sort):
prof = self.profiler()
try:
prof.run(statement)
except SystemExit:
pass
finally:
self._show(prof, filename, sort)
def runctx(self, statement, globals, locals, filename, sort):
prof = self.profiler()
try:
prof.runctx(statement, globals, locals)
except SystemExit:
pass
finally:
self._show(prof, filename, sort)
def _show(self, prof, filename, sort):
if filename is not None:
prof.dump_stats(filename)
else:
prof.print_stats(sort)