From d3cfa1e92a037e7748141ca20e20299ee1132733 Mon Sep 17 00:00:00 2001 From: ford prior Date: Fri, 20 Sep 2024 10:00:46 -0400 Subject: [PATCH] Update stats_cli.py --- aws_doc_sdk_examples_tools/shgit.py | 58 +++++++ aws_doc_sdk_examples_tools/stats_cli.py | 198 ++++++++++-------------- 2 files changed, 142 insertions(+), 114 deletions(-) create mode 100644 aws_doc_sdk_examples_tools/shgit.py diff --git a/aws_doc_sdk_examples_tools/shgit.py b/aws_doc_sdk_examples_tools/shgit.py new file mode 100644 index 0000000..ef321e8 --- /dev/null +++ b/aws_doc_sdk_examples_tools/shgit.py @@ -0,0 +1,58 @@ +from pathlib import Path +from subprocess import run +from typing import Optional + + +def make_long_flags(flags: dict[str, str]) -> list[str]: + return [f"--{key}={val}" for key, val in flags.items()] + + +class Sh: + def __init__(self, + args: Optional[str | list[str]] = None, + check=True, + capture_output=False, + cwd: str | bytes | Path = Path("."), + log=False): + if args is None: + self.args = [] + elif isinstance(args, str): + self.args = [args] + else: + self.args = args + self.check = check + self.capture_output = capture_output + self.cwd: str | bytes | Path = cwd + self.log = log + + def __call__(self, + *args: str, + capture_output=False, + check=None, + cwd: Optional[str | bytes | Path] = None, + **kwargs: str): + check = check or self.check + cwd = cwd or self.cwd + capture_output = capture_output or self.capture_output + long_args = make_long_flags(kwargs) + cmd: list[str] = [*self.args, *args, *long_args] + if self.log: + print(cmd) + return run(cmd, + capture_output=capture_output, + check=check, + shell=False, + cwd=cwd) + + def __getattr__(self, arg: str): + return Sh( + [*self.args, arg], + self.check, + self.capture_output, + self.cwd, + self.log + ) + + +sh = Sh() +git = Sh('git') diff --git a/aws_doc_sdk_examples_tools/stats_cli.py b/aws_doc_sdk_examples_tools/stats_cli.py index b3cbcee..d87dfd8 100644 --- a/aws_doc_sdk_examples_tools/stats_cli.py +++ b/aws_doc_sdk_examples_tools/stats_cli.py @@ -1,12 +1,31 @@ -import subprocess -import yaml +import sys import tempfile -import os +import yaml + from colorama import Fore, Style, init +from io import StringIO +from pathlib import Path + +from .shgit import Sh +from .stats import main as stats + # Initialize colorama for Windows compatibility (does nothing on Linux/macOS) init(autoreset=True) + +class Capturing(list): + def __enter__(self): + self._stdout = sys.stdout + sys.stdout = self._stringio = StringIO() + return self + + def __exit__(self, *_args): + self.extend(self._stringio.getvalue().splitlines()) + del self._stringio # free up some memory + sys.stdout = self._stdout + + # Define the ages you're interested in AGES = ["now", "1 month ago", "2 months ago", "3 months ago", "4 months ago", "5 months ago", "6 months ago"] @@ -16,48 +35,8 @@ # Path to the YAML file you want to extract from each commit FILE_PATH = "tools/update_mirror/config.yaml" -def run_git_command(command, cwd=None): - """ - Run a Git command and return the output. - - Args: - command (str): The Git command to run. - cwd (str, optional): The directory to run the command in. Defaults to None. - - Returns: - str: The output of the Git command, or None if an error occurred. - """ - try: - result = subprocess.run(command, cwd=cwd, text=True, capture_output=True, shell=True) - if result.returncode != 0: - raise subprocess.CalledProcessError(result.returncode, command, result.stderr) - return result.stdout.strip() - except Exception as e: - print(f"{Fore.RED}Error running command: {command}\n{Fore.RED}Error: {e}") - return None - -def clone_or_reuse_repo(repo_url, branch, clone_dir): - """ - Clone a Git repository if it doesn't already exist, or reuse the existing clone. - - Args: - repo_url (str): The URL of the repository to clone. - branch (str): The branch to check out. - clone_dir (str): The directory to clone the repository into. - """ - if os.path.exists(clone_dir): - print(f"{Fore.YELLOW}Repository {repo_url} already cloned in {clone_dir}. Reusing existing clone.") - else: - print(f"{Fore.GREEN}Cloning repository {repo_url} into {clone_dir}...") - clone_cmd = f"git clone {repo_url} {clone_dir}" - run_git_command(clone_cmd) - - # Checkout the correct branch - print(f"{Fore.CYAN}Checking out branch {branch} in {clone_dir}...") - checkout_cmd = f"git checkout {branch}" - run_git_command(checkout_cmd, cwd=clone_dir) -def get_commit_hash_for_age(repo_dir, age): +def get_commit_hash_for_age(git, age): """ Get the commit hash for a specific age in the repository's history. @@ -68,61 +47,41 @@ def get_commit_hash_for_age(repo_dir, age): Returns: str: The commit hash corresponding to the specified age, or None if not found. """ - log_cmd = f'git rev-list -1 --before="{age}" HEAD' - commit_hash = run_git_command(log_cmd, cwd=repo_dir) - if commit_hash: - return commit_hash - else: - print(f"{Fore.RED}Failed to find commit hash for {age} in {repo_dir}") - return None + str(git('rev-list', '-1', f"--before={age}", 'HEAD', capture_output=True).stdout) -def checkout_commit(repo_dir, commit_hash): - """ - Checkout a specific commit hash in the repository. - - Args: - repo_dir (str): The path to the Git repository. - commit_hash (str): The commit hash to checkout. - """ - print(f"{Fore.CYAN}Checking out commit {commit_hash} in {repo_dir}") - checkout_cmd = f'git checkout --force {commit_hash}' - run_git_command(checkout_cmd, cwd=repo_dir) -def run_commands_in_repo(repo_dir, commit_hash, age): +def run_commands_in_repo(git, commit_hash, age): """ Run Git log and a Python command in a specific commit of a repository. Args: - repo_dir (str): The path to the repository. + git (Sh(git)): an shgit with the repo loaded as its CWD commit_hash (str): The commit hash to run the commands in. age (str): The age of the commit being processed. """ - # Checkout the repository to the specific commit - checkout_commit(repo_dir, commit_hash) + # Checkout repo at the specific commit + git.checkout("--force", commit_hash) - # Retrieve the commit details - log_cmd = f'git log -n 1 {commit_hash} --pretty=format:"%H|%an|%aI"' - log_output = run_git_command(log_cmd, cwd=repo_dir) + # Retrieve commit details + log_output = str(git('log', '-n', '1', commit_hash, pretty='format:"%H|%an|%aI"', capture_output=True).stdout) if log_output: - log_parts = log_output.split('|') - commit_hash = log_parts[0] - author_name = log_parts[1] - commit_date = log_parts[2] + hash, name, date = log_output.split('|') - print(f"{Fore.MAGENTA}Commit for {age}: {commit_hash}, Author: {author_name}, Date: {commit_date}") + print(f"{Fore.MAGENTA}Commit for {age}: {hash}, Author: {name}, Date: {date}") # Run the Python command on the repository - python_cmd = f'python3 -m aws_doc_sdk_examples_tools.stats "{repo_dir}"' - print(f"{Fore.CYAN}Running stats command for repository: {repo_dir}") - output = run_git_command(python_cmd, cwd=repo_dir) + print(f"{Fore.CYAN}Running stats command for repository: {git.cwd}") + with Capturing() as output: + stats([git.cwd]) if output: print(output) print("###########################") else: - print(f"{Fore.RED}No commit found for {age} in {repo_dir}") + print(f"{Fore.RED}No commit found for {age} in {git.cwd}") -def get_file_from_commits_and_clone(repo_path, file_path, ages): + +def get_file_from_commits_and_clone(git: Sh, file_path: str, ages): """ Extract file contents from specific commits and clone repositories for each mirror. @@ -135,24 +94,25 @@ def get_file_from_commits_and_clone(repo_path, file_path, ages): dict: A dictionary mapping each age to the mirrors section of the YAML file. """ age_content_dict = {} - cloned_repos = {} # To track cloned repositories and their directories + cloned_repos: dict[str, Path] = {} # To track cloned repositories and their directories - # Create a temporary directory for the clones - with tempfile.TemporaryDirectory() as tmp_dir: - # Fetch the configuration file from the main repository for each age + # Create a tmp directory for the clones + with tempfile.TemporaryDirectory() as tmp: + tmp_dir = Path(tmp) + # Fetch the config file from the main repository for each age for age in ages: - print(f"{Style.BRIGHT}{Fore.BLUE}#############################################################") - print(f"{Style.BRIGHT}{Fore.BLUE}######################## {age.upper()} ##############################") - print(f"{Style.BRIGHT}{Fore.BLUE}#############################################################") + print(f"{Style.BRIGHT}{Fore.BLUE}" + "#" * 61) + print(f"{Style.BRIGHT}{Fore.BLUE}" + f" {age.upper()} ".center(61, "#")) + print(f"{Style.BRIGHT}{Fore.BLUE}" + "#" * 61) # Get the commit hash for the main repository - main_commit_hash = get_commit_hash_for_age(repo_path, age) + main_commit_hash = get_commit_hash_for_age(git, age) if not main_commit_hash: print(f"{Fore.RED}Skipping {age} because commit hash could not be retrieved for the main repository.") continue # Get the YAML configuration from that commit - file_content = run_git_command(f"git show {main_commit_hash}:{file_path}", cwd=repo_path) + file_content = str(git.show(f"{main_commit_hash}:{file_path}").stdout) if file_content: try: @@ -163,30 +123,9 @@ def get_file_from_commits_and_clone(repo_path, file_path, ages): mirrors = yaml_content.get('mirrors', {}) age_content_dict[age] = mirrors - # Clone or reuse repositories for each mirror + # Clone or reuse repos for each mirror for mirror_name, mirror_info in mirrors.items(): - repo_url = mirror_info.get('git_mirror') - branch = mirror_info.get('branch') - dir_name = mirror_info.get('dir') - - # Check if this repository has already been cloned - if repo_url not in cloned_repos: - # If not, clone the repository to a subfolder in the temp directory - clone_dir = os.path.join(tmp_dir, dir_name) - clone_or_reuse_repo(repo_url, branch, clone_dir) - - # Mark this repo as cloned - cloned_repos[repo_url] = clone_dir - else: - print(f"{Fore.YELLOW}Reusing cloned repository {repo_url} for mirror {mirror_name}.") - - # Get the specific commit hash for this repo at this age - repo_commit_hash = get_commit_hash_for_age(cloned_repos[repo_url], age) - if repo_commit_hash: - # Run commands in the cloned repository for the specific commit hash - run_commands_in_repo(cloned_repos[repo_url], repo_commit_hash, age) - else: - print(f"{Fore.RED}Skipping {mirror_name} in {age} due to failure in retrieving commit hash.") + process_mirror(cloned_repos, tmp_dir, age, mirror_name, mirror_info) except yaml.YAMLError as exc: print(f"{Fore.RED}Error parsing YAML file for {age}: {exc}") @@ -195,6 +134,35 @@ def get_file_from_commits_and_clone(repo_path, file_path, ages): return age_content_dict + +def process_mirror(cloned_repos, tmp_dir: Path, age, mirror_name, mirror_info): + repo_url = mirror_info.get('git_mirror') + branch = mirror_info.get('branch') + dir_name = mirror_info.get('dir') + + # Check if repo has already been cloned + if repo_url not in cloned_repos: + # If not, clone the repository to a subfolder in the temp directory + clone_dir = tmp_dir/dir_name + git = Sh('git', cwd=clone_dir) + git.clone(repo_url, str(clone_dir)) + git.checkout(branch) + + # Mark this repo as cloned + cloned_repos[repo_url] = clone_dir + else: + print(f"{Fore.YELLOW}Reusing cloned repository {repo_url} for mirror {mirror_name}.") + + git = Sh('git', cwd=cloned_repos[repo_url]) + # Get the specific commit hash for this repo at this age + repo_commit_hash = get_commit_hash_for_age(git, age) + if repo_commit_hash: + # Run commands in the cloned repository for the specific commit hash + run_commands_in_repo(git, repo_commit_hash, age) + else: + print(f"{Fore.YELLOW}Skipping {mirror_name} in {age} due to failure in retrieving commit hash.") + + def display_age_content_dict(age_content_dict): """ Display the mirrors section extracted from the YAML files, grouped by age. @@ -213,9 +181,11 @@ def display_age_content_dict(age_content_dict): print(f" {Fore.CYAN}dir: {mirror_info.get('dir')}") print("-----------------") + if __name__ == "__main__": - # Get the contents from the commits and clone the repos - age_content_dict = get_file_from_commits_and_clone(REPO_PATH, FILE_PATH, AGES) + # Get the contents from the commits and clone the repos. + git = Sh('git', cwd=REPO_PATH) + age_content_dict = get_file_from_commits_and_clone(git, FILE_PATH, AGES) - # Display the contents in the dictionary format + # Display the contents in the dictionary format. display_age_content_dict(age_content_dict)