summaryrefslogtreecommitdiff
path: root/.github/workflows/dependencies/updater.py
diff options
context:
space:
mode:
Diffstat (limited to '.github/workflows/dependencies/updater.py')
-rw-r--r--.github/workflows/dependencies/updater.py606
1 files changed, 606 insertions, 0 deletions
diff --git a/.github/workflows/dependencies/updater.py b/.github/workflows/dependencies/updater.py
new file mode 100644
index 000000000..783161d6c
--- /dev/null
+++ b/.github/workflows/dependencies/updater.py
@@ -0,0 +1,606 @@
+import json
+import os
+import re
+import shutil
+import subprocess
+import sys
+import timeit
+from copy import deepcopy
+from typing import Literal, NotRequired, Optional, TypedDict
+
+import requests
+import yaml
+from semver import Version
+
+# Get TMP_DIR variable from environment
+TMP_DIR = os.path.join(os.environ.get("TMP_DIR", "/tmp"), "ohmyzsh")
+# Relative path to dependencies.yml file
+DEPS_YAML_FILE = ".github/dependencies.yml"
+# Dry run flag
+DRY_RUN = os.environ.get("DRY_RUN", "0") == "1"
+
+# utils for tag comparison
+BASEVERSION = re.compile(
+ r"""[vV]?
+ (?P<major>(0|[1-9])\d*)
+ (\.
+ (?P<minor>(0|[1-9])\d*)
+ (\.
+ (?P<patch>(0|[1-9])\d*)
+ )?
+ )?
+ """,
+ re.VERBOSE,
+)
+
+
+def coerce(version: str) -> Optional[Version]:
+ match = BASEVERSION.search(version)
+ if not match:
+ return None
+
+ # BASEVERSION looks for `MAJOR.minor.patch` in the string given
+ # it fills with None if any of them is missing (for example `2.1`)
+ ver = {
+ key: 0 if value is None else value for key, value in match.groupdict().items()
+ }
+ # Version takes `major`, `minor`, `patch` arguments
+ ver = Version(**ver) # pyright: ignore[reportArgumentType]
+ return ver
+
+
+class CodeTimer:
+ def __init__(self, name=None):
+ self.name = " '" + name + "'" if name else ""
+
+ def __enter__(self):
+ self.start = timeit.default_timer()
+
+ def __exit__(self, exc_type, exc_value, traceback):
+ self.took = (timeit.default_timer() - self.start) * 1000.0
+ print("Code block" + self.name + " took: " + str(self.took) + " ms")
+
+
+### YAML representation
+def str_presenter(dumper, data):
+ """
+ Configures yaml for dumping multiline strings
+ Ref: https://stackoverflow.com/a/33300001
+ """
+ if len(data.splitlines()) > 1: # check for multiline string
+ return dumper.represent_scalar("tag:yaml.org,2002:str", data, style="|")
+ return dumper.represent_scalar("tag:yaml.org,2002:str", data)
+
+
+yaml.add_representer(str, str_presenter)
+yaml.representer.SafeRepresenter.add_representer(str, str_presenter)
+
+
+# Types
+class DependencyDict(TypedDict):
+ repo: str
+ branch: str
+ version: str
+ precopy: NotRequired[str]
+ postcopy: NotRequired[str]
+
+
+class DependencyYAML(TypedDict):
+ dependencies: dict[str, DependencyDict]
+
+
+class UpdateStatusFalse(TypedDict):
+ has_updates: Literal[False]
+
+
+class UpdateStatusTrue(TypedDict):
+ has_updates: Literal[True]
+ version: str
+ compare_url: str
+ head_ref: str
+ head_url: str
+
+
+class CommandRunner:
+ class Exception(Exception):
+ def __init__(self, message, returncode, stage, stdout, stderr):
+ super().__init__(message)
+ self.returncode = returncode
+ self.stage = stage
+ self.stdout = stdout
+ self.stderr = stderr
+
+ @staticmethod
+ def run_or_fail(command: list[str], stage: str, *args, **kwargs):
+ if DRY_RUN and command[0] == "gh":
+ command.insert(0, "echo")
+
+ result = subprocess.run(command, *args, capture_output=True, **kwargs)
+
+ if result.returncode != 0:
+ raise CommandRunner.Exception(
+ f"{stage} command failed with exit code {result.returncode}",
+ returncode=result.returncode,
+ stage=stage,
+ stdout=result.stdout.decode("utf-8"),
+ stderr=result.stderr.decode("utf-8"),
+ )
+
+ return result
+
+
+class DependencyStore:
+ store: DependencyYAML = {"dependencies": {}}
+
+ @staticmethod
+ def set(data: DependencyYAML):
+ DependencyStore.store = data
+
+ @staticmethod
+ def update_dependency_version(path: str, version: str) -> DependencyYAML:
+ with CodeTimer(f"store deepcopy: {path}"):
+ store_copy = deepcopy(DependencyStore.store)
+
+ dependency = store_copy["dependencies"].get(path)
+ if dependency is None:
+ raise ValueError(f"Dependency {path} {version} not found")
+ dependency["version"] = version
+ store_copy["dependencies"][path] = dependency
+
+ return store_copy
+
+ @staticmethod
+ def write_store(file: str, data: DependencyYAML):
+ with open(file, "w") as yaml_file:
+ yaml.safe_dump(data, yaml_file, sort_keys=False)
+
+
+class Dependency:
+ def __init__(self, path: str, values: DependencyDict):
+ self.path = path
+ self.values = values
+
+ self.name: str = ""
+ self.desc: str = ""
+ self.kind: str = ""
+
+ match path.split("/"):
+ case ["plugins", name]:
+ self.name = name
+ self.kind = "plugin"
+ self.desc = f"{name} plugin"
+ case ["themes", name]:
+ self.name = name.replace(".zsh-theme", "")
+ self.kind = "theme"
+ self.desc = f"{self.name} theme"
+ case _:
+ self.name = self.desc = path
+
+ def __str__(self):
+ output: str = ""
+ for key in DependencyDict.__dict__["__annotations__"].keys():
+ if key not in self.values:
+ output += f"{key}: None\n"
+ continue
+
+ value = self.values[key]
+ if "\n" not in value:
+ output += f"{key}: {value}\n"
+ else:
+ output += f"{key}:\n "
+ output += value.replace("\n", "\n ", value.count("\n") - 1)
+ return output
+
+ def update_or_notify(self):
+ # Print dependency settings
+ print(f"Processing {self.desc}...", file=sys.stderr)
+ print(self, file=sys.stderr)
+
+ # Check for updates
+ repo = self.values["repo"]
+ remote_branch = self.values["branch"]
+ version = self.values["version"]
+ is_tag = version.startswith("tag:")
+
+ try:
+ with CodeTimer(f"update check: {repo}"):
+ if is_tag:
+ status = GitHub.check_newer_tag(repo, version.replace("tag:", ""))
+ else:
+ status = GitHub.check_updates(repo, remote_branch, version)
+
+ if status["has_updates"] is True:
+ short_sha = status["head_ref"][:8]
+ new_version = status["version"] if is_tag else short_sha
+
+ try:
+ branch_name = f"update/{self.path}/{new_version}"
+
+ # Create new branch
+ branch = Git.checkout_or_create_branch(branch_name)
+
+ # Update dependency files
+ self.__apply_upstream_changes()
+
+ if not Git.repo_is_clean():
+ # Update dependencies.yml file
+ self.__update_yaml(
+ f"tag:{new_version}" if is_tag else status["version"]
+ )
+
+ # Add all changes and commit
+ has_new_commit = Git.add_and_commit(self.name, new_version)
+
+ if has_new_commit:
+ # Push changes to remote
+ Git.push(branch)
+
+ # Create GitHub PR
+ GitHub.create_pr(
+ branch,
+ f"chore({self.name}): update to version {new_version}",
+ f"""## Description
+
+Update for **{self.desc}**: update to version [{new_version}]({status["head_url"]}).
+Check out the [list of changes]({status["compare_url"]}).
+""",
+ )
+
+ # Clean up repository
+ Git.clean_repo()
+ except (CommandRunner.Exception, shutil.Error) as e:
+ # Handle exception on automatic update
+ match type(e):
+ case CommandRunner.Exception:
+ # Print error message
+ print(
+ f"Error running {e.stage} command: {e.returncode}", # pyright: ignore[reportAttributeAccessIssue]
+ file=sys.stderr,
+ )
+ print(e.stderr, file=sys.stderr) # pyright: ignore[reportAttributeAccessIssue]
+ case shutil.Error:
+ print(f"Error copying files: {e}", file=sys.stderr)
+
+ try:
+ Git.clean_repo()
+ except CommandRunner.Exception as e:
+ print(
+ f"Error reverting repository to clean state: {e}",
+ file=sys.stderr,
+ )
+ sys.exit(1)
+
+ # Create a GitHub issue to notify maintainer
+ title = f"{self.path}: update to {new_version}"
+ body = f"""## Description
+
+There is a new version of `{self.name}` {self.kind} available.
+
+New version: [{new_version}]({status["head_url"]})
+Check out the [list of changes]({status["compare_url"]}).
+"""
+
+ print("Creating GitHub issue", file=sys.stderr)
+ print(f"{title}\n\n{body}", file=sys.stderr)
+ GitHub.create_issue(title, body)
+ except Exception as e:
+ print(e, file=sys.stderr)
+
+ def __update_yaml(self, new_version: str) -> None:
+ dep_yaml = DependencyStore.update_dependency_version(self.path, new_version)
+ DependencyStore.write_store(DEPS_YAML_FILE, dep_yaml)
+
+ def __apply_upstream_changes(self) -> None:
+ # Patterns to ignore in copying files from upstream repo
+ GLOBAL_IGNORE = [".git", ".github", ".gitignore"]
+
+ path = os.path.abspath(self.path)
+ precopy = self.values.get("precopy")
+ postcopy = self.values.get("postcopy")
+
+ repo = self.values["repo"]
+ branch = self.values["branch"]
+ remote_url = f"https://github.com/{repo}.git"
+ repo_dir = os.path.join(TMP_DIR, repo)
+
+ # Clone repository
+ Git.clone(remote_url, branch, repo_dir, reclone=True)
+
+ # Run precopy on tmp repo
+ if precopy is not None:
+ print("Running precopy script:", end="\n ", file=sys.stderr)
+ print(
+ precopy.replace("\n", "\n ", precopy.count("\n") - 1), file=sys.stderr
+ )
+ CommandRunner.run_or_fail(
+ ["bash", "-c", precopy], cwd=repo_dir, stage="Precopy"
+ )
+
+ # Copy files from upstream repo
+ print(f"Copying files from {repo_dir} to {path}", file=sys.stderr)
+ shutil.copytree(
+ repo_dir,
+ path,
+ dirs_exist_ok=True,
+ ignore=shutil.ignore_patterns(*GLOBAL_IGNORE),
+ )
+
+ # Run postcopy on our repository
+ if postcopy is not None:
+ print("Running postcopy script:", end="\n ", file=sys.stderr)
+ print(
+ postcopy.replace("\n", "\n ", postcopy.count("\n") - 1),
+ file=sys.stderr,
+ )
+ CommandRunner.run_or_fail(
+ ["bash", "-c", postcopy], cwd=path, stage="Postcopy"
+ )
+
+
+class Git:
+ default_branch = "master"
+
+ @staticmethod
+ def clone(remote_url: str, branch: str, repo_dir: str, reclone=False):
+ # If repo needs to be fresh
+ if reclone and os.path.exists(repo_dir):
+ shutil.rmtree(repo_dir)
+
+ # Clone repo in tmp directory and checkout branch
+ if not os.path.exists(repo_dir):
+ print(
+ f"Cloning {remote_url} to {repo_dir} and checking out {branch}",
+ file=sys.stderr,
+ )
+ CommandRunner.run_or_fail(
+ ["git", "clone", "--depth=1", "-b", branch, remote_url, repo_dir],
+ stage="Clone",
+ )
+
+ @staticmethod
+ def checkout_or_create_branch(branch_name: str):
+ # Get current branch name
+ result = CommandRunner.run_or_fail(
+ ["git", "rev-parse", "--abbrev-ref", "HEAD"], stage="GetDefaultBranch"
+ )
+ Git.default_branch = result.stdout.decode("utf-8").strip()
+
+ # Create new branch and return created branch name
+ try:
+ # try to checkout already existing branch
+ CommandRunner.run_or_fail(
+ ["git", "checkout", branch_name], stage="CreateBranch"
+ )
+ except CommandRunner.Exception:
+ # otherwise create new branch
+ CommandRunner.run_or_fail(
+ ["git", "checkout", "-b", branch_name], stage="CreateBranch"
+ )
+ return branch_name
+
+ @staticmethod
+ def repo_is_clean() -> bool:
+ """
+ Returns `True` if the repo is clean.
+ Returns `False` if the repo is dirty.
+ """
+ try:
+ CommandRunner.run_or_fail(
+ ["git", "diff", "--exit-code"], stage="CheckRepoClean"
+ )
+ return True
+ except CommandRunner.Exception:
+ return False
+
+ @staticmethod
+ def add_and_commit(scope: str, version: str) -> bool:
+ """
+ Returns `True` if there were changes and were indeed commited.
+ Returns `False` if the repo was clean and no changes were commited.
+ """
+ if Git.repo_is_clean():
+ return False
+
+ user_name = os.environ.get("GIT_APP_NAME")
+ user_email = os.environ.get("GIT_APP_EMAIL")
+
+ # Add all files to git staging
+ CommandRunner.run_or_fail(["git", "add", "-A", "-v"], stage="AddFiles")
+
+ # Reset environment and git config
+ clean_env = os.environ.copy()
+ clean_env["LANG"] = "C.UTF-8"
+ clean_env["GIT_CONFIG_GLOBAL"] = "/dev/null"
+ clean_env["GIT_CONFIG_NOSYSTEM"] = "1"
+
+ # Commit with settings above
+ CommandRunner.run_or_fail(
+ [
+ "git",
+ "-c",
+ f"user.name={user_name}",
+ "-c",
+ f"user.email={user_email}",
+ "commit",
+ "-m",
+ f"chore({scope}): update to {version}",
+ ],
+ stage="CreateCommit",
+ env=clean_env,
+ )
+ return True
+
+ @staticmethod
+ def push(branch: str):
+ CommandRunner.run_or_fail(
+ ["git", "push", "-u", "origin", branch], stage="PushBranch"
+ )
+
+ @staticmethod
+ def clean_repo():
+ CommandRunner.run_or_fail(
+ ["git", "reset", "--hard", "HEAD"], stage="ResetRepository"
+ )
+ CommandRunner.run_or_fail(
+ ["git", "checkout", Git.default_branch], stage="CheckoutDefaultBranch"
+ )
+
+
+class GitHub:
+ @staticmethod
+ def check_newer_tag(repo, current_tag) -> UpdateStatusFalse | UpdateStatusTrue:
+ # GET /repos/:owner/:repo/git/refs/tags
+ url = f"https://api.github.com/repos/{repo}/git/refs/tags"
+
+ # Send a GET request to the GitHub API
+ response = requests.get(url)
+ current_version = coerce(current_tag)
+ if current_version is None:
+ raise ValueError(
+ f"Stored {current_version} from {repo} does not follow semver"
+ )
+
+ # If the request was successful
+ if response.status_code == 200:
+ # Parse the JSON response
+ data = response.json()
+
+ if len(data) == 0:
+ return {
+ "has_updates": False,
+ }
+
+ latest_ref = None
+ latest_version: Optional[Version] = None
+ for ref in data:
+ # we find the tag since GitHub returns it as plain git ref
+ tag_version = coerce(ref["ref"].replace("refs/tags/", ""))
+ if tag_version is None:
+ # we skip every tag that is not semver-complaint
+ continue
+ if latest_version is None or tag_version.compare(latest_version) > 0:
+ # if we have a "greater" semver version, set it as latest
+ latest_version = tag_version
+ latest_ref = ref
+
+ # raise if no valid semver tag is found
+ if latest_ref is None or latest_version is None:
+ raise ValueError(f"No tags following semver found in {repo}")
+
+ # we get the tag since GitHub returns it as plain git ref
+ latest_tag = latest_ref["ref"].replace("refs/tags/", "")
+
+ if latest_version.compare(current_version) <= 0:
+ return {
+ "has_updates": False,
+ }
+
+ return {
+ "has_updates": True,
+ "version": latest_tag,
+ "compare_url": f"https://github.com/{repo}/compare/{current_tag}...{latest_tag}",
+ "head_ref": latest_ref["object"]["sha"],
+ "head_url": f"https://github.com/{repo}/releases/tag/{latest_tag}",
+ }
+ else:
+ # If the request was not successful, raise an exception
+ raise Exception(
+ f"GitHub API request failed with status code {response.status_code}: {response.json()}"
+ )
+
+ @staticmethod
+ def check_updates(repo, branch, version) -> UpdateStatusFalse | UpdateStatusTrue:
+ url = f"https://api.github.com/repos/{repo}/compare/{version}...{branch}"
+
+ # Send a GET request to the GitHub API
+ response = requests.get(url)
+
+ # If the request was successful
+ if response.status_code == 200:
+ # Parse the JSON response
+ data = response.json()
+
+ # If the base is behind the head, there is a newer version
+ has_updates = data["status"] != "identical"
+
+ if not has_updates:
+ return {
+ "has_updates": False,
+ }
+
+ return {
+ "has_updates": data["status"] != "identical",
+ "version": data["commits"][-1]["sha"],
+ "compare_url": data["permalink_url"],
+ "head_ref": data["commits"][-1]["sha"],
+ "head_url": data["commits"][-1]["html_url"],
+ }
+ else:
+ # If the request was not successful, raise an exception
+ raise Exception(
+ f"GitHub API request failed with status code {response.status_code}: {response.json()}"
+ )
+
+ @staticmethod
+ def create_issue(title: str, body: str) -> None:
+ cmd = ["gh", "issue", "create", "-t", title, "-b", body]
+ CommandRunner.run_or_fail(cmd, stage="CreateIssue")
+
+ @staticmethod
+ def create_pr(branch: str, title: str, body: str) -> None:
+ # first of all let's check if PR is already open
+ check_cmd = [
+ "gh",
+ "pr",
+ "list",
+ "--state",
+ "open",
+ "--head",
+ branch,
+ "--json",
+ "title",
+ ]
+ # returncode is 0 also if no PRs are found
+ output = json.loads(
+ CommandRunner.run_or_fail(check_cmd, stage="CheckPullRequestOpen")
+ .stdout.decode("utf-8")
+ .strip()
+ )
+ # we have PR in this case!
+ if len(output) > 0:
+ return
+ cmd = [
+ "gh",
+ "pr",
+ "create",
+ "-B",
+ Git.default_branch,
+ "-H",
+ branch,
+ "-t",
+ title,
+ "-b",
+ body,
+ ]
+ CommandRunner.run_or_fail(cmd, stage="CreatePullRequest")
+
+
+def main():
+ # Load the YAML file
+ with open(DEPS_YAML_FILE, "r") as yaml_file:
+ data: DependencyYAML = yaml.safe_load(yaml_file)
+
+ if "dependencies" not in data:
+ raise Exception("dependencies.yml not properly formatted")
+
+ # Cache YAML version
+ DependencyStore.set(data)
+
+ dependencies = data["dependencies"]
+ for path in dependencies:
+ dependency = Dependency(path, dependencies[path])
+ dependency.update_or_notify()
+
+
+if __name__ == "__main__":
+ main()