mesa/bin/ci/gitlab_gql.py

304 lines
9.2 KiB
Python
Executable File

#!/usr/bin/env python3
import re
from argparse import ArgumentDefaultsHelpFormatter, ArgumentParser, Namespace
from dataclasses import dataclass, field
from os import getenv
from pathlib import Path
from typing import Any, Iterable, Optional, Pattern, Union
import yaml
from filecache import DAY, filecache
from gql import Client, gql
from gql.transport.aiohttp import AIOHTTPTransport
from graphql import DocumentNode
Dag = dict[str, list[str]]
TOKEN_DIR = Path(getenv("XDG_CONFIG_HOME") or Path.home() / ".config")
def get_token_from_default_dir() -> str:
try:
token_file = TOKEN_DIR / "gitlab-token"
return token_file.resolve()
except FileNotFoundError as ex:
print(
f"Could not find {token_file}, please provide a token file as an argument"
)
raise ex
def get_project_root_dir():
root_path = Path(__file__).parent.parent.parent.resolve()
gitlab_file = root_path / ".gitlab-ci.yml"
assert gitlab_file.exists()
return root_path
@dataclass
class GitlabGQL:
_transport: Any = field(init=False)
client: Client = field(init=False)
url: str = "https://gitlab.freedesktop.org/api/graphql"
token: Optional[str] = None
def __post_init__(self):
self._setup_gitlab_gql_client()
def _setup_gitlab_gql_client(self) -> Client:
# Select your transport with a defined url endpoint
headers = {}
if self.token:
headers["Authorization"] = f"Bearer {self.token}"
self._transport = AIOHTTPTransport(url=self.url, headers=headers)
# Create a GraphQL client using the defined transport
self.client = Client(
transport=self._transport, fetch_schema_from_transport=True
)
@filecache(DAY)
def query(
self, gql_file: Union[Path, str], params: dict[str, Any]
) -> dict[str, Any]:
# Provide a GraphQL query
source_path = Path(__file__).parent
pipeline_query_file = source_path / gql_file
query: DocumentNode
with open(pipeline_query_file, "r") as f:
pipeline_query = f.read()
query = gql(pipeline_query)
# Execute the query on the transport
return self.client.execute(query, variable_values=params)
def invalidate_query_cache(self):
self.query._db.clear()
def create_job_needs_dag(
gl_gql: GitlabGQL, params
) -> tuple[Dag, dict[str, dict[str, Any]]]:
result = gl_gql.query("pipeline_details.gql", params)
dag = {}
jobs = {}
pipeline = result["project"]["pipeline"]
if not pipeline:
raise RuntimeError(f"Could not find any pipelines for {params}")
for stage in pipeline["stages"]["nodes"]:
for stage_job in stage["groups"]["nodes"]:
for job in stage_job["jobs"]["nodes"]:
needs = job.pop("needs")["nodes"]
jobs[job["name"]] = job
dag[job["name"]] = {node["name"] for node in needs}
for job, needs in dag.items():
needs: set
partial = True
while partial:
next_depth = {n for dn in needs for n in dag[dn]}
partial = not needs.issuperset(next_depth)
needs = needs.union(next_depth)
dag[job] = needs
return dag, jobs
def filter_dag(dag: Dag, regex: Pattern) -> Dag:
return {job: needs for job, needs in dag.items() if re.match(regex, job)}
def print_dag(dag: Dag) -> None:
for job, needs in dag.items():
print(f"{job}:")
print(f"\t{' '.join(needs)}")
print()
def fetch_merged_yaml(gl_gql: GitlabGQL, params) -> dict[Any]:
gitlab_yml_file = get_project_root_dir() / ".gitlab-ci.yml"
content = Path(gitlab_yml_file).read_text().strip()
params["content"] = content
raw_response = gl_gql.query("job_details.gql", params)
if merged_yaml := raw_response["ciConfig"]["mergedYaml"]:
return yaml.safe_load(merged_yaml)
gl_gql.invalidate_query_cache()
raise ValueError(
"""
Could not fetch any content for merged YAML,
please verify if the git SHA exists in remote.
Maybe you forgot to `git push`? """
)
def recursive_fill(job, relationship_field, target_data, acc_data: dict, merged_yaml):
if relatives := job.get(relationship_field):
if isinstance(relatives, str):
relatives = [relatives]
for relative in relatives:
parent_job = merged_yaml[relative]
acc_data = recursive_fill(parent_job, acc_data, merged_yaml)
acc_data |= job.get(target_data, {})
return acc_data
def get_variables(job, merged_yaml, project_path, sha) -> dict[str, str]:
p = get_project_root_dir() / ".gitlab-ci" / "image-tags.yml"
image_tags = yaml.safe_load(p.read_text())
variables = image_tags["variables"]
variables |= merged_yaml["variables"]
variables |= job["variables"]
variables["CI_PROJECT_PATH"] = project_path
variables["CI_PROJECT_NAME"] = project_path.split("/")[1]
variables["CI_REGISTRY_IMAGE"] = "registry.freedesktop.org/${CI_PROJECT_PATH}"
variables["CI_COMMIT_SHA"] = sha
while recurse_among_variables_space(variables):
pass
return variables
# Based on: https://stackoverflow.com/a/2158532/1079223
def flatten(xs):
for x in xs:
if isinstance(x, Iterable) and not isinstance(x, (str, bytes)):
yield from flatten(x)
else:
yield x
def get_full_script(job) -> list[str]:
script = []
for script_part in ("before_script", "script", "after_script"):
script.append(f"# {script_part}")
lines = flatten(job.get(script_part, []))
script.extend(lines)
script.append("")
return script
def recurse_among_variables_space(var_graph) -> bool:
updated = False
for var, value in var_graph.items():
value = str(value)
dep_vars = []
if match := re.findall(r"(\$[{]?[\w\d_]*[}]?)", value):
all_dep_vars = [v.lstrip("${").rstrip("}") for v in match]
# print(value, match, all_dep_vars)
dep_vars = [v for v in all_dep_vars if v in var_graph]
for dep_var in dep_vars:
dep_value = str(var_graph[dep_var])
new_value = var_graph[var]
new_value = new_value.replace(f"${{{dep_var}}}", dep_value)
new_value = new_value.replace(f"${dep_var}", dep_value)
var_graph[var] = new_value
updated |= dep_value != new_value
return updated
def get_job_final_definition(job_name, merged_yaml, project_path, sha):
job = merged_yaml[job_name]
variables = get_variables(job, merged_yaml, project_path, sha)
print("# --------- variables ---------------")
for var, value in sorted(variables.items()):
print(f"export {var}={value!r}")
# TODO: Recurse into needs to get full script
# TODO: maybe create a extra yaml file to avoid too much rework
script = get_full_script(job)
print()
print()
print("# --------- full script ---------------")
print("\n".join(script))
if image := variables.get("MESA_IMAGE"):
print()
print()
print("# --------- container image ---------------")
print(image)
def parse_args() -> Namespace:
parser = ArgumentParser(
formatter_class=ArgumentDefaultsHelpFormatter,
description="CLI and library with utility functions to debug jobs via Gitlab GraphQL",
epilog=f"""Example:
{Path(__file__).name} --rev $(git rev-parse HEAD) --print-job-dag""",
)
parser.add_argument("-pp", "--project-path", type=str, default="mesa/mesa")
parser.add_argument("--sha", "--rev", type=str, required=True)
parser.add_argument(
"--regex",
type=str,
required=False,
help="Regex pattern for the job name to be considered",
)
parser.add_argument("--print-dag", action="store_true", help="Print job needs DAG")
parser.add_argument(
"--print-merged-yaml",
action="store_true",
help="Print the resulting YAML for the specific SHA",
)
parser.add_argument(
"--print-job-manifest", type=str, help="Print the resulting job data"
)
parser.add_argument(
"--gitlab-token-file",
type=str,
default=get_token_from_default_dir(),
help="force GitLab token, otherwise it's read from $XDG_CONFIG_HOME/gitlab-token",
)
args = parser.parse_args()
args.gitlab_token = Path(args.gitlab_token_file).read_text()
return args
def main():
args = parse_args()
gl_gql = GitlabGQL(token=args.gitlab_token)
if args.print_dag:
dag, jobs = create_job_needs_dag(
gl_gql, {"projectPath": args.project_path, "sha": args.sha}
)
if args.regex:
dag = filter_dag(dag, re.compile(args.regex))
print_dag(dag)
if args.print_merged_yaml:
print(
fetch_merged_yaml(
gl_gql, {"projectPath": args.project_path, "sha": args.sha}
)
)
if args.print_job_manifest:
merged_yaml = fetch_merged_yaml(
gl_gql, {"projectPath": args.project_path, "sha": args.sha}
)
get_job_final_definition(
args.print_job_manifest, merged_yaml, args.project_path, args.sha
)
if __name__ == "__main__":
main()